Orbit-Maintenance / src /streamlit_app.py
stardust-coder's picture
[add] first commit
fcb2819
# app.py
import os
import time
import math
import random
from datetime import datetime
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
from openai import OpenAI
# ----------------------------
# ページ設定
# ----------------------------
st.set_page_config(
page_title="軌道制御支援AIシステム(PoC開発中)",
page_icon="🛰️",
layout="wide"
)
# ----------------------------
# OpenAI client
# ----------------------------
api_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=api_key) if api_key else None
# ----------------------------
# 初期状態
# ----------------------------
if "sat_state" not in st.session_state:
st.session_state.sat_state = {
"angle_deg": 20.0, # 軌道位置角
"altitude_km": 540.0,
"speed_kms": 7.66,
"battery_pct": 92,
"temp_c": 24.5,
"signal_db": 81,
"mode": "IDLE",
"health": "NOMINAL",
"last_event": "初期化完了",
# 追加: レンジング / 姿勢
"range_km": 1240.0, # 地上局から衛星までの距離
"pos_x": 420.0, # 軌道面上の相対位置X
"pos_y": 160.0, # 軌道面上の相対位置Y
"yaw_deg": 12.0, # 姿勢 Yaw
"pitch_deg": -4.0, # 姿勢 Pitch
"roll_deg": 7.0, # 姿勢 Roll
}
if "process_logs" not in st.session_state:
st.session_state.process_logs = []
# ----------------------------
# ユーティリティ
# ----------------------------
def call_openai(prompt: str) -> str:
if client is None:
return "OPENAI_API_KEY が未設定です。ダミー応答を返します。"
res = client.responses.create(
model="gpt-4.1-mini",
input=prompt,
)
return res.output_text
def update_satellite_state(step_idx: int):
"""工程ごとに衛星状態を少し変化させる"""
s = st.session_state.sat_state
s["angle_deg"] = (s["angle_deg"] + random.uniform(18, 38)) % 360
theta = math.radians(s["angle_deg"])
s["altitude_km"] = round(540 + random.uniform(-8, 8), 1)
s["speed_kms"] = round(7.66 + random.uniform(-0.04, 0.04), 3)
s["battery_pct"] = max(20, min(100, s["battery_pct"] - random.randint(1, 3)))
s["temp_c"] = round(24 + random.uniform(-4, 6), 1)
s["signal_db"] = max(35, min(99, s["signal_db"] + random.randint(-6, 4)))
# 追加: 軌道位置を簡易更新
orbit_r = 480
s["pos_x"] = round(orbit_r * math.cos(theta) + random.uniform(-20, 20), 1)
s["pos_y"] = round(orbit_r * math.sin(theta) + random.uniform(-20, 20), 1)
# 追加: 地上局(原点近傍)からの距離レンジング
gs_x, gs_y = 0.0, -420.0
s["range_km"] = round(
math.sqrt((s["pos_x"] - gs_x) ** 2 + (s["pos_y"] - gs_y) ** 2) + random.uniform(-15, 15),
1
)
# 追加: 姿勢角を変化
s["yaw_deg"] = round((s["yaw_deg"] + random.uniform(-8, 8)), 1)
s["pitch_deg"] = round(max(-45, min(45, s["pitch_deg"] + random.uniform(-4, 4))), 1)
s["roll_deg"] = round(max(-45, min(45, s["roll_deg"] + random.uniform(-5, 5))), 1)
modes = {
0: "BOOT",
1: "SENSING",
2: "ANALYZING",
3: "UPLINK",
4: "REPORTING",
}
events = {
0: "衛星起動シーケンス",
1: "テレメトリ取得中",
2: "観測データ解析中",
3: "地上局と通信中",
4: "ミッション結果反映"
}
s["mode"] = modes.get(step_idx, "ACTIVE")
s["last_event"] = events.get(step_idx, "更新")
s["health"] = "NOMINAL" if s["signal_db"] >= 50 and s["battery_pct"] >= 30 else "CHECK"
def make_tracking_figure():
"""
右側に出す可視化:
- 左: 地上局から衛星までのレンジング + 軌道位置
- 右: 衛星姿勢 (機体向き)
"""
s = st.session_state.sat_state
gs_x, gs_y = 0.0, -420.0
sat_x, sat_y = s["pos_x"], s["pos_y"]
fig = go.Figure()
# ----------------------------
# 左エリア: レンジング / 軌道位置
# ----------------------------
# 軌道の目安円
orbit_x = []
orbit_y = []
for d in range(361):
r = math.radians(d)
orbit_x.append(480 * math.cos(r))
orbit_y.append(480 * math.sin(r))
fig.add_trace(go.Scatter(
x=orbit_x,
y=orbit_y,
mode="lines",
name="Reference Orbit",
line=dict(width=2, dash="dot"),
))
# 地上局
fig.add_trace(go.Scatter(
x=[gs_x],
y=[gs_y],
mode="markers+text",
name="Ground Station",
text=["Ground"],
textposition="bottom center",
marker=dict(size=14, symbol="square"),
))
# 衛星
fig.add_trace(go.Scatter(
x=[sat_x],
y=[sat_y],
mode="markers+text",
name="Satellite Position",
text=["🛰️"],
textposition="middle center",
marker=dict(size=18),
))
# レンジ線
fig.add_trace(go.Scatter(
x=[gs_x, sat_x],
y=[gs_y, sat_y],
mode="lines",
name="Range",
line=dict(width=3),
))
# レンジ注記
mid_x = (gs_x + sat_x) / 2
mid_y = (gs_y + sat_y) / 2
fig.add_annotation(
x=mid_x,
y=mid_y,
text=f"Range: {s['range_km']} km",
showarrow=False,
yshift=12
)
# 位置注記
fig.add_annotation(
x=sat_x,
y=sat_y,
text=f"Pos({s['pos_x']}, {s['pos_y']})",
showarrow=True,
ax=30,
ay=-30
)
# ----------------------------
# 右エリア: 姿勢表示
# ----------------------------
# 右側に独立した姿勢表示領域を作る
attitude_center_x = 980
attitude_center_y = 0
body_len = 120
yaw_rad = math.radians(s["yaw_deg"])
body_dx = body_len * math.cos(yaw_rad)
body_dy = body_len * math.sin(yaw_rad)
# 機体中心
fig.add_trace(go.Scatter(
x=[attitude_center_x],
y=[attitude_center_y],
mode="markers+text",
name="Attitude Center",
text=["SAT"],
textposition="middle center",
marker=dict(size=22, symbol="diamond"),
))
# 機体進行方向(Yaw)
fig.add_trace(go.Scatter(
x=[attitude_center_x, attitude_center_x + body_dx],
y=[attitude_center_y, attitude_center_y + body_dy],
mode="lines+markers",
name="Yaw Direction",
line=dict(width=4),
marker=dict(size=8),
))
# 簡易ソーラーパドル
panel_span = 70
perp_dx = panel_span * math.cos(yaw_rad + math.pi / 2)
perp_dy = panel_span * math.sin(yaw_rad + math.pi / 2)
fig.add_trace(go.Scatter(
x=[attitude_center_x - perp_dx, attitude_center_x + perp_dx],
y=[attitude_center_y - perp_dy, attitude_center_y + perp_dy],
mode="lines",
name="Solar Panel Axis",
line=dict(width=6),
))
# 姿勢テキスト
fig.add_annotation(
x=attitude_center_x,
y=attitude_center_y - 170,
text=(
f"Yaw: {s['yaw_deg']}°<br>"
f"Pitch: {s['pitch_deg']}°<br>"
f"Roll: {s['roll_deg']}°"
),
showarrow=False
)
fig.add_annotation(
x=attitude_center_x,
y=attitude_center_y + 150,
text="Attitude",
showarrow=False,
font=dict(size=16)
)
fig.update_layout(
title="レンジング / 軌道位置 / 姿勢ビュー",
height=420,
margin=dict(l=10, r=10, t=45, b=10),
showlegend=False,
xaxis=dict(
visible=False,
range=[-650, 1250]
),
yaxis=dict(
visible=False,
range=[-650, 650],
scaleanchor="x",
scaleratio=1
),
)
return fig
def render_right_panel(target):
"""右半分の衛星可視化"""
s = st.session_state.sat_state
with target.container():
st.subheader("🛰️ Satellite Status")
c1, c2, c3 = st.columns(3)
c1.metric("高度", f"{s['altitude_km']} km")
c2.metric("速度", f"{s['speed_kms']} km/s")
c3.metric("バッテリー", f"{s['battery_pct']}%")
c4, c5, c6 = st.columns(3)
c4.metric("距離", f"{s['range_km']} km")
c5.metric("信号", f"{s['signal_db']} dB")
c6.metric("モード", s["mode"])
c7, c8, c9 = st.columns(3)
c7.metric("Yaw", f"{s['yaw_deg']}°")
c8.metric("Pitch", f"{s['pitch_deg']}°")
c9.metric("Roll", f"{s['roll_deg']}°")
st.plotly_chart(make_tracking_figure(), use_container_width=True)
health_df = pd.DataFrame(
{
"項目": ["Battery", "Signal", "Thermal"],
"値": [
s["battery_pct"],
s["signal_db"],
min(max(int((80 - abs(s["temp_c"] - 25) * 4)), 0), 100),
],
}
)
st.bar_chart(health_df.set_index("項目"))
st.caption(
f"Health: {s['health']} | Last event: {s['last_event']} | Updated: {datetime.now().strftime('%H:%M:%S')}"
)
# ----------------------------
# UI
# ----------------------------
st.title("🛰️ 衛星軌道保持AIシステム(PoC開発中)")
st.write("⚠️本アプリはデモであり、表示される内容は全てダミーです。")
st.write("左側で処理フロー、右側で衛星状態を可視化します。")
mission_text = st.text_area(
"ミッション指示",
value="衛星の軌道を保持してください。",
height=110,
)
run = st.button("ミッション開始", type="primary")
left_col, right_col = st.columns([1, 1], gap="large")
with left_col:
left_header = st.empty()
progress_box = st.empty()
status_box = st.empty()
log_box = st.empty()
result_box = st.empty()
with right_col:
sat_box = st.empty()
render_right_panel(sat_box)
# ----------------------------
# 実行
# ----------------------------
if run:
steps = [
{
"name": "制御計画/立案",
"desc": "条件を満たすパラメータ(スラスタセット/噴射秒数)を設定します。",
"action": lambda text: call_openai(
f"""
以下のミッション条件をもとに、軌道制御計画を立案してください。
- 使用するスラスタセット
- 想定される噴射秒数
- 計画時の前提条件
- 注意点
ミッション条件:
{text}
""".strip()
)
},
{
"name": "残推薬管理/確認",
"desc": "軌道制御に必要な消費推薬量を見積もります。",
"action": lambda text: call_openai(
f"""
以下の制御計画をもとに、残推薬管理の観点で評価してください。
- 想定される推薬消費量
- 残量への影響
- 運用上の注意点
- 地上局向けコメント
制御計画:
{text}
""".strip()
)
},
{
"name": "軌道決定",
"desc": "観測情報または制御結果をもとに衛星位置・軌道を決定します。",
"action": lambda text: call_openai(
f"""
以下の情報をもとに、軌道決定結果を日本語で簡潔にまとめてください。
- 現在の衛星位置・速度の整理
- 推定される軌道状態
- 判断根拠
- 不確かさや注意点
入力情報:
{text}
""".strip()
)
},
{
"name": "制御評価",
"desc": "軌道制御の結果と効率を評価します。",
"action": lambda text: call_openai(
f"""
以下の軌道決定結果をもとに、制御評価を行ってください。
- 制御結果の妥当性
- 目標達成度
- 効率の評価
- 改善点
- 地上局向けの簡潔な総括
軌道決定結果:
{text}
""".strip()
)
},
{
"name": "レポーティング",
"desc": "軌道制御の結果と効率をレポーティングします。",
"action": lambda text: call_openai(
f"""
以下の軌道決定結果をもとに、制御評価レポートを地上局向けに作成してください。
- 制御の成功可否
- 効率評価
- 推薬消費の妥当性
- 次回運用への示唆
軌道決定結果:
{text}
""".strip()
)
}
]
logs = []
outputs = {}
left_header.markdown("## 進行状況")
progress = progress_box.progress(0, text="待機中")
with status_box.container():
with st.status("処理中...", expanded=True) as status:
current_text = mission_text
for i, step in enumerate(steps):
pct = int(i / len(steps) * 100)
update_satellite_state(i)
render_right_panel(sat_box)
progress.progress(pct, text=f"Step {i+1}/{len(steps)}: {step['name']}")
logs.append(f"⏳ Step {i+1}: {step['name']} 開始")
log_box.markdown("\n\n".join(logs))
st.write(f"### Step {i+1}. {step['name']}")
st.write(step["desc"])
time.sleep(0.7)
result = step["action"](current_text)
outputs[step["name"]] = result
current_text = result
logs.append(f"✅ Step {i+1}: {step['name']} 完了")
log_box.markdown("\n\n".join(logs))
with st.expander(f"{step['name']} の出力", expanded=False):
st.write(result)
update_satellite_state(i + 1)
render_right_panel(sat_box)
time.sleep(0.5)
progress.progress(100, text="ミッション完了")
status.update(label="完了", state="complete", expanded=True)
with result_box.container():
st.markdown("## 最終レポート")
st.write(outputs.get("レポーティング", "出力なし"))