Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import time | |
| import math | |
| import random | |
| from datetime import datetime | |
| import streamlit as st | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| from openai import OpenAI | |
| # ---------------------------- | |
| # ページ設定 | |
| # ---------------------------- | |
| st.set_page_config( | |
| page_title="軌道制御支援AIシステム(PoC開発中)", | |
| page_icon="🛰️", | |
| layout="wide" | |
| ) | |
| # ---------------------------- | |
| # OpenAI client | |
| # ---------------------------- | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| client = OpenAI(api_key=api_key) if api_key else None | |
| # ---------------------------- | |
| # 初期状態 | |
| # ---------------------------- | |
| if "sat_state" not in st.session_state: | |
| st.session_state.sat_state = { | |
| "angle_deg": 20.0, # 軌道位置角 | |
| "altitude_km": 540.0, | |
| "speed_kms": 7.66, | |
| "battery_pct": 92, | |
| "temp_c": 24.5, | |
| "signal_db": 81, | |
| "mode": "IDLE", | |
| "health": "NOMINAL", | |
| "last_event": "初期化完了", | |
| # 追加: レンジング / 姿勢 | |
| "range_km": 1240.0, # 地上局から衛星までの距離 | |
| "pos_x": 420.0, # 軌道面上の相対位置X | |
| "pos_y": 160.0, # 軌道面上の相対位置Y | |
| "yaw_deg": 12.0, # 姿勢 Yaw | |
| "pitch_deg": -4.0, # 姿勢 Pitch | |
| "roll_deg": 7.0, # 姿勢 Roll | |
| } | |
| if "process_logs" not in st.session_state: | |
| st.session_state.process_logs = [] | |
| # ---------------------------- | |
| # ユーティリティ | |
| # ---------------------------- | |
| def call_openai(prompt: str) -> str: | |
| if client is None: | |
| return "OPENAI_API_KEY が未設定です。ダミー応答を返します。" | |
| res = client.responses.create( | |
| model="gpt-4.1-mini", | |
| input=prompt, | |
| ) | |
| return res.output_text | |
| def update_satellite_state(step_idx: int): | |
| """工程ごとに衛星状態を少し変化させる""" | |
| s = st.session_state.sat_state | |
| s["angle_deg"] = (s["angle_deg"] + random.uniform(18, 38)) % 360 | |
| theta = math.radians(s["angle_deg"]) | |
| s["altitude_km"] = round(540 + random.uniform(-8, 8), 1) | |
| s["speed_kms"] = round(7.66 + random.uniform(-0.04, 0.04), 3) | |
| s["battery_pct"] = max(20, min(100, s["battery_pct"] - random.randint(1, 3))) | |
| s["temp_c"] = round(24 + random.uniform(-4, 6), 1) | |
| s["signal_db"] = max(35, min(99, s["signal_db"] + random.randint(-6, 4))) | |
| # 追加: 軌道位置を簡易更新 | |
| orbit_r = 480 | |
| s["pos_x"] = round(orbit_r * math.cos(theta) + random.uniform(-20, 20), 1) | |
| s["pos_y"] = round(orbit_r * math.sin(theta) + random.uniform(-20, 20), 1) | |
| # 追加: 地上局(原点近傍)からの距離レンジング | |
| gs_x, gs_y = 0.0, -420.0 | |
| s["range_km"] = round( | |
| math.sqrt((s["pos_x"] - gs_x) ** 2 + (s["pos_y"] - gs_y) ** 2) + random.uniform(-15, 15), | |
| 1 | |
| ) | |
| # 追加: 姿勢角を変化 | |
| s["yaw_deg"] = round((s["yaw_deg"] + random.uniform(-8, 8)), 1) | |
| s["pitch_deg"] = round(max(-45, min(45, s["pitch_deg"] + random.uniform(-4, 4))), 1) | |
| s["roll_deg"] = round(max(-45, min(45, s["roll_deg"] + random.uniform(-5, 5))), 1) | |
| modes = { | |
| 0: "BOOT", | |
| 1: "SENSING", | |
| 2: "ANALYZING", | |
| 3: "UPLINK", | |
| 4: "REPORTING", | |
| } | |
| events = { | |
| 0: "衛星起動シーケンス", | |
| 1: "テレメトリ取得中", | |
| 2: "観測データ解析中", | |
| 3: "地上局と通信中", | |
| 4: "ミッション結果反映" | |
| } | |
| s["mode"] = modes.get(step_idx, "ACTIVE") | |
| s["last_event"] = events.get(step_idx, "更新") | |
| s["health"] = "NOMINAL" if s["signal_db"] >= 50 and s["battery_pct"] >= 30 else "CHECK" | |
| def make_tracking_figure(): | |
| """ | |
| 右側に出す可視化: | |
| - 左: 地上局から衛星までのレンジング + 軌道位置 | |
| - 右: 衛星姿勢 (機体向き) | |
| """ | |
| s = st.session_state.sat_state | |
| gs_x, gs_y = 0.0, -420.0 | |
| sat_x, sat_y = s["pos_x"], s["pos_y"] | |
| fig = go.Figure() | |
| # ---------------------------- | |
| # 左エリア: レンジング / 軌道位置 | |
| # ---------------------------- | |
| # 軌道の目安円 | |
| orbit_x = [] | |
| orbit_y = [] | |
| for d in range(361): | |
| r = math.radians(d) | |
| orbit_x.append(480 * math.cos(r)) | |
| orbit_y.append(480 * math.sin(r)) | |
| fig.add_trace(go.Scatter( | |
| x=orbit_x, | |
| y=orbit_y, | |
| mode="lines", | |
| name="Reference Orbit", | |
| line=dict(width=2, dash="dot"), | |
| )) | |
| # 地上局 | |
| fig.add_trace(go.Scatter( | |
| x=[gs_x], | |
| y=[gs_y], | |
| mode="markers+text", | |
| name="Ground Station", | |
| text=["Ground"], | |
| textposition="bottom center", | |
| marker=dict(size=14, symbol="square"), | |
| )) | |
| # 衛星 | |
| fig.add_trace(go.Scatter( | |
| x=[sat_x], | |
| y=[sat_y], | |
| mode="markers+text", | |
| name="Satellite Position", | |
| text=["🛰️"], | |
| textposition="middle center", | |
| marker=dict(size=18), | |
| )) | |
| # レンジ線 | |
| fig.add_trace(go.Scatter( | |
| x=[gs_x, sat_x], | |
| y=[gs_y, sat_y], | |
| mode="lines", | |
| name="Range", | |
| line=dict(width=3), | |
| )) | |
| # レンジ注記 | |
| mid_x = (gs_x + sat_x) / 2 | |
| mid_y = (gs_y + sat_y) / 2 | |
| fig.add_annotation( | |
| x=mid_x, | |
| y=mid_y, | |
| text=f"Range: {s['range_km']} km", | |
| showarrow=False, | |
| yshift=12 | |
| ) | |
| # 位置注記 | |
| fig.add_annotation( | |
| x=sat_x, | |
| y=sat_y, | |
| text=f"Pos({s['pos_x']}, {s['pos_y']})", | |
| showarrow=True, | |
| ax=30, | |
| ay=-30 | |
| ) | |
| # ---------------------------- | |
| # 右エリア: 姿勢表示 | |
| # ---------------------------- | |
| # 右側に独立した姿勢表示領域を作る | |
| attitude_center_x = 980 | |
| attitude_center_y = 0 | |
| body_len = 120 | |
| yaw_rad = math.radians(s["yaw_deg"]) | |
| body_dx = body_len * math.cos(yaw_rad) | |
| body_dy = body_len * math.sin(yaw_rad) | |
| # 機体中心 | |
| fig.add_trace(go.Scatter( | |
| x=[attitude_center_x], | |
| y=[attitude_center_y], | |
| mode="markers+text", | |
| name="Attitude Center", | |
| text=["SAT"], | |
| textposition="middle center", | |
| marker=dict(size=22, symbol="diamond"), | |
| )) | |
| # 機体進行方向(Yaw) | |
| fig.add_trace(go.Scatter( | |
| x=[attitude_center_x, attitude_center_x + body_dx], | |
| y=[attitude_center_y, attitude_center_y + body_dy], | |
| mode="lines+markers", | |
| name="Yaw Direction", | |
| line=dict(width=4), | |
| marker=dict(size=8), | |
| )) | |
| # 簡易ソーラーパドル | |
| panel_span = 70 | |
| perp_dx = panel_span * math.cos(yaw_rad + math.pi / 2) | |
| perp_dy = panel_span * math.sin(yaw_rad + math.pi / 2) | |
| fig.add_trace(go.Scatter( | |
| x=[attitude_center_x - perp_dx, attitude_center_x + perp_dx], | |
| y=[attitude_center_y - perp_dy, attitude_center_y + perp_dy], | |
| mode="lines", | |
| name="Solar Panel Axis", | |
| line=dict(width=6), | |
| )) | |
| # 姿勢テキスト | |
| fig.add_annotation( | |
| x=attitude_center_x, | |
| y=attitude_center_y - 170, | |
| text=( | |
| f"Yaw: {s['yaw_deg']}°<br>" | |
| f"Pitch: {s['pitch_deg']}°<br>" | |
| f"Roll: {s['roll_deg']}°" | |
| ), | |
| showarrow=False | |
| ) | |
| fig.add_annotation( | |
| x=attitude_center_x, | |
| y=attitude_center_y + 150, | |
| text="Attitude", | |
| showarrow=False, | |
| font=dict(size=16) | |
| ) | |
| fig.update_layout( | |
| title="レンジング / 軌道位置 / 姿勢ビュー", | |
| height=420, | |
| margin=dict(l=10, r=10, t=45, b=10), | |
| showlegend=False, | |
| xaxis=dict( | |
| visible=False, | |
| range=[-650, 1250] | |
| ), | |
| yaxis=dict( | |
| visible=False, | |
| range=[-650, 650], | |
| scaleanchor="x", | |
| scaleratio=1 | |
| ), | |
| ) | |
| return fig | |
| def render_right_panel(target): | |
| """右半分の衛星可視化""" | |
| s = st.session_state.sat_state | |
| with target.container(): | |
| st.subheader("🛰️ Satellite Status") | |
| c1, c2, c3 = st.columns(3) | |
| c1.metric("高度", f"{s['altitude_km']} km") | |
| c2.metric("速度", f"{s['speed_kms']} km/s") | |
| c3.metric("バッテリー", f"{s['battery_pct']}%") | |
| c4, c5, c6 = st.columns(3) | |
| c4.metric("距離", f"{s['range_km']} km") | |
| c5.metric("信号", f"{s['signal_db']} dB") | |
| c6.metric("モード", s["mode"]) | |
| c7, c8, c9 = st.columns(3) | |
| c7.metric("Yaw", f"{s['yaw_deg']}°") | |
| c8.metric("Pitch", f"{s['pitch_deg']}°") | |
| c9.metric("Roll", f"{s['roll_deg']}°") | |
| st.plotly_chart(make_tracking_figure(), use_container_width=True) | |
| health_df = pd.DataFrame( | |
| { | |
| "項目": ["Battery", "Signal", "Thermal"], | |
| "値": [ | |
| s["battery_pct"], | |
| s["signal_db"], | |
| min(max(int((80 - abs(s["temp_c"] - 25) * 4)), 0), 100), | |
| ], | |
| } | |
| ) | |
| st.bar_chart(health_df.set_index("項目")) | |
| st.caption( | |
| f"Health: {s['health']} | Last event: {s['last_event']} | Updated: {datetime.now().strftime('%H:%M:%S')}" | |
| ) | |
| # ---------------------------- | |
| # UI | |
| # ---------------------------- | |
| st.title("🛰️ 衛星軌道保持AIシステム(PoC開発中)") | |
| st.write("⚠️本アプリはデモであり、表示される内容は全てダミーです。") | |
| st.write("左側で処理フロー、右側で衛星状態を可視化します。") | |
| mission_text = st.text_area( | |
| "ミッション指示", | |
| value="衛星の軌道を保持してください。", | |
| height=110, | |
| ) | |
| run = st.button("ミッション開始", type="primary") | |
| left_col, right_col = st.columns([1, 1], gap="large") | |
| with left_col: | |
| left_header = st.empty() | |
| progress_box = st.empty() | |
| status_box = st.empty() | |
| log_box = st.empty() | |
| result_box = st.empty() | |
| with right_col: | |
| sat_box = st.empty() | |
| render_right_panel(sat_box) | |
| # ---------------------------- | |
| # 実行 | |
| # ---------------------------- | |
| if run: | |
| steps = [ | |
| { | |
| "name": "制御計画/立案", | |
| "desc": "条件を満たすパラメータ(スラスタセット/噴射秒数)を設定します。", | |
| "action": lambda text: call_openai( | |
| f""" | |
| 以下のミッション条件をもとに、軌道制御計画を立案してください。 | |
| - 使用するスラスタセット | |
| - 想定される噴射秒数 | |
| - 計画時の前提条件 | |
| - 注意点 | |
| ミッション条件: | |
| {text} | |
| """.strip() | |
| ) | |
| }, | |
| { | |
| "name": "残推薬管理/確認", | |
| "desc": "軌道制御に必要な消費推薬量を見積もります。", | |
| "action": lambda text: call_openai( | |
| f""" | |
| 以下の制御計画をもとに、残推薬管理の観点で評価してください。 | |
| - 想定される推薬消費量 | |
| - 残量への影響 | |
| - 運用上の注意点 | |
| - 地上局向けコメント | |
| 制御計画: | |
| {text} | |
| """.strip() | |
| ) | |
| }, | |
| { | |
| "name": "軌道決定", | |
| "desc": "観測情報または制御結果をもとに衛星位置・軌道を決定します。", | |
| "action": lambda text: call_openai( | |
| f""" | |
| 以下の情報をもとに、軌道決定結果を日本語で簡潔にまとめてください。 | |
| - 現在の衛星位置・速度の整理 | |
| - 推定される軌道状態 | |
| - 判断根拠 | |
| - 不確かさや注意点 | |
| 入力情報: | |
| {text} | |
| """.strip() | |
| ) | |
| }, | |
| { | |
| "name": "制御評価", | |
| "desc": "軌道制御の結果と効率を評価します。", | |
| "action": lambda text: call_openai( | |
| f""" | |
| 以下の軌道決定結果をもとに、制御評価を行ってください。 | |
| - 制御結果の妥当性 | |
| - 目標達成度 | |
| - 効率の評価 | |
| - 改善点 | |
| - 地上局向けの簡潔な総括 | |
| 軌道決定結果: | |
| {text} | |
| """.strip() | |
| ) | |
| }, | |
| { | |
| "name": "レポーティング", | |
| "desc": "軌道制御の結果と効率をレポーティングします。", | |
| "action": lambda text: call_openai( | |
| f""" | |
| 以下の軌道決定結果をもとに、制御評価レポートを地上局向けに作成してください。 | |
| - 制御の成功可否 | |
| - 効率評価 | |
| - 推薬消費の妥当性 | |
| - 次回運用への示唆 | |
| 軌道決定結果: | |
| {text} | |
| """.strip() | |
| ) | |
| } | |
| ] | |
| logs = [] | |
| outputs = {} | |
| left_header.markdown("## 進行状況") | |
| progress = progress_box.progress(0, text="待機中") | |
| with status_box.container(): | |
| with st.status("処理中...", expanded=True) as status: | |
| current_text = mission_text | |
| for i, step in enumerate(steps): | |
| pct = int(i / len(steps) * 100) | |
| update_satellite_state(i) | |
| render_right_panel(sat_box) | |
| progress.progress(pct, text=f"Step {i+1}/{len(steps)}: {step['name']}") | |
| logs.append(f"⏳ Step {i+1}: {step['name']} 開始") | |
| log_box.markdown("\n\n".join(logs)) | |
| st.write(f"### Step {i+1}. {step['name']}") | |
| st.write(step["desc"]) | |
| time.sleep(0.7) | |
| result = step["action"](current_text) | |
| outputs[step["name"]] = result | |
| current_text = result | |
| logs.append(f"✅ Step {i+1}: {step['name']} 完了") | |
| log_box.markdown("\n\n".join(logs)) | |
| with st.expander(f"{step['name']} の出力", expanded=False): | |
| st.write(result) | |
| update_satellite_state(i + 1) | |
| render_right_panel(sat_box) | |
| time.sleep(0.5) | |
| progress.progress(100, text="ミッション完了") | |
| status.update(label="完了", state="complete", expanded=True) | |
| with result_box.container(): | |
| st.markdown("## 最終レポート") | |
| st.write(outputs.get("レポーティング", "出力なし")) |