# app.py import os import time import math import random from datetime import datetime import streamlit as st import pandas as pd import plotly.graph_objects as go from openai import OpenAI # ---------------------------- # ページ設定 # ---------------------------- st.set_page_config( page_title="軌道制御支援AIシステム(PoC開発中)", page_icon="🛰️", layout="wide" ) # ---------------------------- # OpenAI client # ---------------------------- api_key = os.getenv("OPENAI_API_KEY") client = OpenAI(api_key=api_key) if api_key else None # ---------------------------- # 初期状態 # ---------------------------- if "sat_state" not in st.session_state: st.session_state.sat_state = { "angle_deg": 20.0, # 軌道位置角 "altitude_km": 540.0, "speed_kms": 7.66, "battery_pct": 92, "temp_c": 24.5, "signal_db": 81, "mode": "IDLE", "health": "NOMINAL", "last_event": "初期化完了", # 追加: レンジング / 姿勢 "range_km": 1240.0, # 地上局から衛星までの距離 "pos_x": 420.0, # 軌道面上の相対位置X "pos_y": 160.0, # 軌道面上の相対位置Y "yaw_deg": 12.0, # 姿勢 Yaw "pitch_deg": -4.0, # 姿勢 Pitch "roll_deg": 7.0, # 姿勢 Roll } if "process_logs" not in st.session_state: st.session_state.process_logs = [] # ---------------------------- # ユーティリティ # ---------------------------- def call_openai(prompt: str) -> str: if client is None: return "OPENAI_API_KEY が未設定です。ダミー応答を返します。" res = client.responses.create( model="gpt-4.1-mini", input=prompt, ) return res.output_text def update_satellite_state(step_idx: int): """工程ごとに衛星状態を少し変化させる""" s = st.session_state.sat_state s["angle_deg"] = (s["angle_deg"] + random.uniform(18, 38)) % 360 theta = math.radians(s["angle_deg"]) s["altitude_km"] = round(540 + random.uniform(-8, 8), 1) s["speed_kms"] = round(7.66 + random.uniform(-0.04, 0.04), 3) s["battery_pct"] = max(20, min(100, s["battery_pct"] - random.randint(1, 3))) s["temp_c"] = round(24 + random.uniform(-4, 6), 1) s["signal_db"] = max(35, min(99, s["signal_db"] + random.randint(-6, 4))) # 追加: 軌道位置を簡易更新 orbit_r = 480 s["pos_x"] = round(orbit_r * math.cos(theta) + random.uniform(-20, 20), 1) s["pos_y"] = round(orbit_r * math.sin(theta) + random.uniform(-20, 20), 1) # 追加: 地上局(原点近傍)からの距離レンジング gs_x, gs_y = 0.0, -420.0 s["range_km"] = round( math.sqrt((s["pos_x"] - gs_x) ** 2 + (s["pos_y"] - gs_y) ** 2) + random.uniform(-15, 15), 1 ) # 追加: 姿勢角を変化 s["yaw_deg"] = round((s["yaw_deg"] + random.uniform(-8, 8)), 1) s["pitch_deg"] = round(max(-45, min(45, s["pitch_deg"] + random.uniform(-4, 4))), 1) s["roll_deg"] = round(max(-45, min(45, s["roll_deg"] + random.uniform(-5, 5))), 1) modes = { 0: "BOOT", 1: "SENSING", 2: "ANALYZING", 3: "UPLINK", 4: "REPORTING", } events = { 0: "衛星起動シーケンス", 1: "テレメトリ取得中", 2: "観測データ解析中", 3: "地上局と通信中", 4: "ミッション結果反映" } s["mode"] = modes.get(step_idx, "ACTIVE") s["last_event"] = events.get(step_idx, "更新") s["health"] = "NOMINAL" if s["signal_db"] >= 50 and s["battery_pct"] >= 30 else "CHECK" def make_tracking_figure(): """ 右側に出す可視化: - 左: 地上局から衛星までのレンジング + 軌道位置 - 右: 衛星姿勢 (機体向き) """ s = st.session_state.sat_state gs_x, gs_y = 0.0, -420.0 sat_x, sat_y = s["pos_x"], s["pos_y"] fig = go.Figure() # ---------------------------- # 左エリア: レンジング / 軌道位置 # ---------------------------- # 軌道の目安円 orbit_x = [] orbit_y = [] for d in range(361): r = math.radians(d) orbit_x.append(480 * math.cos(r)) orbit_y.append(480 * math.sin(r)) fig.add_trace(go.Scatter( x=orbit_x, y=orbit_y, mode="lines", name="Reference Orbit", line=dict(width=2, dash="dot"), )) # 地上局 fig.add_trace(go.Scatter( x=[gs_x], y=[gs_y], mode="markers+text", name="Ground Station", text=["Ground"], textposition="bottom center", marker=dict(size=14, symbol="square"), )) # 衛星 fig.add_trace(go.Scatter( x=[sat_x], y=[sat_y], mode="markers+text", name="Satellite Position", text=["🛰️"], textposition="middle center", marker=dict(size=18), )) # レンジ線 fig.add_trace(go.Scatter( x=[gs_x, sat_x], y=[gs_y, sat_y], mode="lines", name="Range", line=dict(width=3), )) # レンジ注記 mid_x = (gs_x + sat_x) / 2 mid_y = (gs_y + sat_y) / 2 fig.add_annotation( x=mid_x, y=mid_y, text=f"Range: {s['range_km']} km", showarrow=False, yshift=12 ) # 位置注記 fig.add_annotation( x=sat_x, y=sat_y, text=f"Pos({s['pos_x']}, {s['pos_y']})", showarrow=True, ax=30, ay=-30 ) # ---------------------------- # 右エリア: 姿勢表示 # ---------------------------- # 右側に独立した姿勢表示領域を作る attitude_center_x = 980 attitude_center_y = 0 body_len = 120 yaw_rad = math.radians(s["yaw_deg"]) body_dx = body_len * math.cos(yaw_rad) body_dy = body_len * math.sin(yaw_rad) # 機体中心 fig.add_trace(go.Scatter( x=[attitude_center_x], y=[attitude_center_y], mode="markers+text", name="Attitude Center", text=["SAT"], textposition="middle center", marker=dict(size=22, symbol="diamond"), )) # 機体進行方向(Yaw) fig.add_trace(go.Scatter( x=[attitude_center_x, attitude_center_x + body_dx], y=[attitude_center_y, attitude_center_y + body_dy], mode="lines+markers", name="Yaw Direction", line=dict(width=4), marker=dict(size=8), )) # 簡易ソーラーパドル panel_span = 70 perp_dx = panel_span * math.cos(yaw_rad + math.pi / 2) perp_dy = panel_span * math.sin(yaw_rad + math.pi / 2) fig.add_trace(go.Scatter( x=[attitude_center_x - perp_dx, attitude_center_x + perp_dx], y=[attitude_center_y - perp_dy, attitude_center_y + perp_dy], mode="lines", name="Solar Panel Axis", line=dict(width=6), )) # 姿勢テキスト fig.add_annotation( x=attitude_center_x, y=attitude_center_y - 170, text=( f"Yaw: {s['yaw_deg']}°
" f"Pitch: {s['pitch_deg']}°
" f"Roll: {s['roll_deg']}°" ), showarrow=False ) fig.add_annotation( x=attitude_center_x, y=attitude_center_y + 150, text="Attitude", showarrow=False, font=dict(size=16) ) fig.update_layout( title="レンジング / 軌道位置 / 姿勢ビュー", height=420, margin=dict(l=10, r=10, t=45, b=10), showlegend=False, xaxis=dict( visible=False, range=[-650, 1250] ), yaxis=dict( visible=False, range=[-650, 650], scaleanchor="x", scaleratio=1 ), ) return fig def render_right_panel(target): """右半分の衛星可視化""" s = st.session_state.sat_state with target.container(): st.subheader("🛰️ Satellite Status") c1, c2, c3 = st.columns(3) c1.metric("高度", f"{s['altitude_km']} km") c2.metric("速度", f"{s['speed_kms']} km/s") c3.metric("バッテリー", f"{s['battery_pct']}%") c4, c5, c6 = st.columns(3) c4.metric("距離", f"{s['range_km']} km") c5.metric("信号", f"{s['signal_db']} dB") c6.metric("モード", s["mode"]) c7, c8, c9 = st.columns(3) c7.metric("Yaw", f"{s['yaw_deg']}°") c8.metric("Pitch", f"{s['pitch_deg']}°") c9.metric("Roll", f"{s['roll_deg']}°") st.plotly_chart(make_tracking_figure(), use_container_width=True) health_df = pd.DataFrame( { "項目": ["Battery", "Signal", "Thermal"], "値": [ s["battery_pct"], s["signal_db"], min(max(int((80 - abs(s["temp_c"] - 25) * 4)), 0), 100), ], } ) st.bar_chart(health_df.set_index("項目")) st.caption( f"Health: {s['health']} | Last event: {s['last_event']} | Updated: {datetime.now().strftime('%H:%M:%S')}" ) # ---------------------------- # UI # ---------------------------- st.title("🛰️ 衛星軌道保持AIシステム(PoC開発中)") st.write("⚠️本アプリはデモであり、表示される内容は全てダミーです。") st.write("左側で処理フロー、右側で衛星状態を可視化します。") mission_text = st.text_area( "ミッション指示", value="衛星の軌道を保持してください。", height=110, ) run = st.button("ミッション開始", type="primary") left_col, right_col = st.columns([1, 1], gap="large") with left_col: left_header = st.empty() progress_box = st.empty() status_box = st.empty() log_box = st.empty() result_box = st.empty() with right_col: sat_box = st.empty() render_right_panel(sat_box) # ---------------------------- # 実行 # ---------------------------- if run: steps = [ { "name": "制御計画/立案", "desc": "条件を満たすパラメータ(スラスタセット/噴射秒数)を設定します。", "action": lambda text: call_openai( f""" 以下のミッション条件をもとに、軌道制御計画を立案してください。 - 使用するスラスタセット - 想定される噴射秒数 - 計画時の前提条件 - 注意点 ミッション条件: {text} """.strip() ) }, { "name": "残推薬管理/確認", "desc": "軌道制御に必要な消費推薬量を見積もります。", "action": lambda text: call_openai( f""" 以下の制御計画をもとに、残推薬管理の観点で評価してください。 - 想定される推薬消費量 - 残量への影響 - 運用上の注意点 - 地上局向けコメント 制御計画: {text} """.strip() ) }, { "name": "軌道決定", "desc": "観測情報または制御結果をもとに衛星位置・軌道を決定します。", "action": lambda text: call_openai( f""" 以下の情報をもとに、軌道決定結果を日本語で簡潔にまとめてください。 - 現在の衛星位置・速度の整理 - 推定される軌道状態 - 判断根拠 - 不確かさや注意点 入力情報: {text} """.strip() ) }, { "name": "制御評価", "desc": "軌道制御の結果と効率を評価します。", "action": lambda text: call_openai( f""" 以下の軌道決定結果をもとに、制御評価を行ってください。 - 制御結果の妥当性 - 目標達成度 - 効率の評価 - 改善点 - 地上局向けの簡潔な総括 軌道決定結果: {text} """.strip() ) }, { "name": "レポーティング", "desc": "軌道制御の結果と効率をレポーティングします。", "action": lambda text: call_openai( f""" 以下の軌道決定結果をもとに、制御評価レポートを地上局向けに作成してください。 - 制御の成功可否 - 効率評価 - 推薬消費の妥当性 - 次回運用への示唆 軌道決定結果: {text} """.strip() ) } ] logs = [] outputs = {} left_header.markdown("## 進行状況") progress = progress_box.progress(0, text="待機中") with status_box.container(): with st.status("処理中...", expanded=True) as status: current_text = mission_text for i, step in enumerate(steps): pct = int(i / len(steps) * 100) update_satellite_state(i) render_right_panel(sat_box) progress.progress(pct, text=f"Step {i+1}/{len(steps)}: {step['name']}") logs.append(f"⏳ Step {i+1}: {step['name']} 開始") log_box.markdown("\n\n".join(logs)) st.write(f"### Step {i+1}. {step['name']}") st.write(step["desc"]) time.sleep(0.7) result = step["action"](current_text) outputs[step["name"]] = result current_text = result logs.append(f"✅ Step {i+1}: {step['name']} 完了") log_box.markdown("\n\n".join(logs)) with st.expander(f"{step['name']} の出力", expanded=False): st.write(result) update_satellite_state(i + 1) render_right_panel(sat_box) time.sleep(0.5) progress.progress(100, text="ミッション完了") status.update(label="完了", state="complete", expanded=True) with result_box.container(): st.markdown("## 最終レポート") st.write(outputs.get("レポーティング", "出力なし"))