Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Video OCR Extractor - Streamlit GUI | |
| 動画から文字データを抽出してCSVに出力するGUIアプリケーション | |
| """ | |
| import streamlit as st | |
| import cv2 | |
| import easyocr | |
| import pandas as pd | |
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| import io | |
| import tempfile | |
| import os | |
| from datetime import datetime | |
| import re | |
| import zipfile | |
| import unicodedata | |
| from streamlit_image_coordinates import streamlit_image_coordinates | |
| import matplotlib.pyplot as plt | |
| import matplotlib | |
| matplotlib.use('Agg') # 日本語フォント等のエラーを避けるため | |
| # ページ設定 | |
| st.set_page_config( | |
| page_title="Video OCR Extractor", | |
| page_icon="📹", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| class VideoProcessor: | |
| """動画処理クラス""" | |
| def load_video_info(video_path): | |
| """動画情報を取得""" | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None | |
| info = { | |
| 'fps': cap.get(cv2.CAP_PROP_FPS), | |
| 'total_frames': int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), | |
| 'width': int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), | |
| 'height': int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| } | |
| info['duration'] = info['total_frames'] / info['fps'] if info['fps'] > 0 else 0 | |
| cap.release() | |
| return info | |
| def get_frame(video_path, frame_number=0): | |
| """指定フレームを取得""" | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if ret: | |
| # BGRからRGBに変換 | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| return frame_rgb | |
| return None | |
| class OCRProcessor: | |
| """OCR処理クラス""" | |
| def __init__(self): | |
| self.reader = None | |
| def initialize(self): | |
| """EasyOCRを初期化""" | |
| if self.reader is None: | |
| with st.spinner('OCRエンジンを初期化中...'): | |
| self.reader = easyocr.Reader(['en'], gpu=True) | |
| return self.reader | |
| def preprocess_image(image): | |
| """OCR精度向上のための画像前処理(赤背景特化)""" | |
| # 1. チャンネル抽出によるグレースケール化 | |
| if len(image.shape) == 3: | |
| # RGB前提 (VideoProcessor.get_frameがRGBで返す) | |
| r_mean = np.mean(image[:, :, 0]) | |
| b_mean = np.mean(image[:, :, 2]) | |
| if r_mean > b_mean * 1.5: | |
| # 赤背景(R=High, B=Low)かつ白文字(R=High, B=High)の場合、 | |
| # Blueチャンネルが最もコントラストが高くなる | |
| gray = image[:, :, 2] | |
| else: | |
| gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) | |
| else: | |
| gray = image.copy() | |
| # 2. リサイズ (高速化のためスケールを2に抑制) | |
| scale = 2 | |
| height, width = gray.shape | |
| gray = cv2.resize(gray, (width * scale, height * scale), interpolation=cv2.INTER_CUBIC) | |
| # 3. コントラスト強調 | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) | |
| gray = clahe.apply(gray) | |
| # 4. 軽量なノイズ除去 (GaussianBlur) | |
| gray = cv2.GaussianBlur(gray, (5, 5), 0) | |
| # 5. シャープニング | |
| kernel = np.array([[-1,-1,-1], [-1, 9,-1], [-1,-1,-1]]) | |
| gray = cv2.filter2D(gray, -1, kernel) | |
| # 6. 二値化 (Otsu) | |
| _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| # 7. 白黒反転 (背景=白、文字=黒) | |
| # 元々「暗い背景に明るい文字」を想定しているため、二値化後は文字が255になる。 | |
| # これを反転させて背景を255(白)、文字を0(黒)にする。 | |
| binary = cv2.bitwise_not(binary) | |
| return binary | |
| def clean_ocr_text(text, label=""): | |
| """OCR結果をクリーニング(改良版v4: 時間を秒に変換)""" | |
| if not text or text.strip() == "": | |
| return None | |
| # 全角文字などを正規化(: -> :) | |
| text = unicodedata.normalize('NFKC', text) | |
| # --- 手順A: ラベルによる分岐(最優先) --- | |
| is_time = False | |
| if label and ('time' in label.lower() or 't+' in label.lower()): | |
| is_time = True | |
| if is_time: | |
| # --- 手順B: 時間用補正 (Time専用ロジック) --- | |
| # 1. ドットとスペースをコロンに置換 | |
| temp_text = text.replace('.', ':').replace(' ', ':') | |
| # 2. 連続するコロンを1つにまとめる | |
| temp_text = re.sub(r':+', ':', temp_text) | |
| # 3. 先頭と末尾のコロンを削除 | |
| temp_text = temp_text.strip(':') | |
| # 4. コロンで分割して数値化 | |
| parts = temp_text.split(':') | |
| numeric_parts = [] | |
| for p in parts: | |
| digits = re.sub(r'\D', '', p) | |
| if digits: | |
| val = int(digits) | |
| # --- 60進法補正 (0/8/9 誤読対策) --- | |
| # 分 or 秒のパート(通常2桁)が60以上の場合は、十の位を0とみなす | |
| if len(digits) == 2 and val >= 60: | |
| val = val % 10 # 十の位を0にする | |
| numeric_parts.append(val) | |
| # 秒に換算 | |
| if len(numeric_parts) == 3: # HH:MM:SS | |
| h, m, s = numeric_parts | |
| return float(h * 3600 + m * 60 + s) | |
| elif len(numeric_parts) == 2: # MM:SS | |
| m, s = numeric_parts | |
| return float(m * 60 + s) | |
| elif len(numeric_parts) == 1: # SS | |
| return float(numeric_parts[0]) | |
| return None | |
| else: | |
| # --- 手順C: 数値用補正 (Speed/Altitude用) --- | |
| # Altitudeの場合はスペースをドットに置換(OCRの誤読対策) | |
| # Speed等の場合は単にスペースを削除 | |
| if label and 'altitude' in label.lower(): | |
| # まず数字、ドット、スペース、マイナス以外を除去 | |
| cleaned = re.sub(r'[^\d\.\-\s]', '', text) | |
| # スペースをドットに変換 | |
| cleaned = cleaned.replace(' ', '.') | |
| # 連続するドットを1つに | |
| cleaned = re.sub(r'\.+', '.', cleaned) | |
| else: | |
| # 数字、ドット、マイナス以外を除去 | |
| cleaned = re.sub(r'[^\d\.\-]', '', text) | |
| # 数値変換 | |
| try: | |
| val = float(cleaned) | |
| # 異常値フィルタリング (Speedのみ) | |
| if label and 'speed' in label.lower(): | |
| if val > 50000: return None | |
| return val | |
| except ValueError: | |
| return None | |
| def process_roi(self, frame, roi, label=""): | |
| """ROI領域をOCR処理(改良版)""" | |
| x, y, w, h = roi | |
| # ROIにパディング(余白)を追加して切り出す | |
| padding = 5 # ピクセル | |
| frame_h, frame_w = frame.shape[:2] | |
| # パディングを加えた座標(画像境界を超えないようにクリップ) | |
| x_start = max(0, x - padding) | |
| y_start = max(0, y - padding) | |
| x_end = min(frame_w, x + w + padding) | |
| y_end = min(frame_h, y + h + padding) | |
| # ROIを切り出す | |
| roi_image = frame[y_start:y_end, x_start:x_end] | |
| # 前処理 | |
| processed = self.preprocess_image(roi_image) | |
| # OCR実行(パラメータを最適化) | |
| results = self.reader.readtext( | |
| processed, | |
| detail=0, | |
| paragraph=False, | |
| allowlist='0123456789.:', # 許可する文字を制限 | |
| batch_size=1 | |
| ) | |
| text = ' '.join(results).strip() | |
| # データクリーニング | |
| value = self.clean_ocr_text(text, label) | |
| return value, text | |
| class PostProcessor: | |
| """データ抽出後の後処理クラス""" | |
| def correct_time_outliers(df, column='Time'): | |
| """ | |
| 時間の外れ値を補正する | |
| ロジック: | |
| 1. 前後5レコード(計10レコード)の時間(Hour)を確認 | |
| 2. 周囲が全て同じ時間で、対象レコードだけ異なる場合、周囲の時間に合わせる | |
| 3. 前5レコードと後5レコードで時間が異なる場合(時間の変わり目)は補正しない | |
| """ | |
| if df is None or df.empty or column not in df.columns: | |
| return df | |
| # 元のデータを変更しないようにコピー | |
| df_corrected = df.copy() | |
| # 前後5レコード確認するため、最低11レコード必要 | |
| if len(df) < 11: | |
| return df_corrected | |
| # 時間(Hour)を計算 | |
| values = df_corrected[column].values | |
| # NaNチェック | |
| if np.isnan(values).any(): | |
| # NaNがある場合は補正スキップするか、fillする。ここではスキップ実装 | |
| pass | |
| hours = np.floor(values / 3600).astype(int) | |
| # 5番目から 最後-5番目までループ | |
| for i in range(5, len(df) - 5): | |
| curr_h = hours[i] | |
| # 前後5レコード取得 | |
| prev_5 = hours[i-5:i] | |
| next_5 = hours[i+1:i+6] | |
| # 前5つが全て同じ値か確認 | |
| if not np.all(prev_5 == prev_5[0]): | |
| continue | |
| # 後5つが全て同じ値か確認 | |
| if not np.all(next_5 == next_5[0]): | |
| continue | |
| # 前後のブロックが一致しているか確認(安定しているか) | |
| stable_h = prev_5[0] | |
| if next_5[0] != stable_h: | |
| # 前後で値が違う=時間の変わり目なので補正しない | |
| continue | |
| # 現在の値が周囲と異なる場合=外れ値 | |
| if curr_h != stable_h: | |
| # 補正実行 | |
| # 分・秒はそのままに、時間だけstable_hに置き換える | |
| old_seconds = values[i] | |
| remainder = old_seconds % 3600 | |
| new_seconds = (stable_h * 3600) + remainder | |
| values[i] = new_seconds | |
| df_corrected[column] = values | |
| return df_corrected | |
| def init_session_state(): | |
| """セッション状態の初期化""" | |
| if 'video_paths' not in st.session_state: | |
| st.session_state.video_paths = [] # 複数動画対応 | |
| if 'video_path' not in st.session_state: | |
| st.session_state.video_path = None # 後方互換性のため保持 | |
| if 'video_info' not in st.session_state: | |
| st.session_state.video_info = None | |
| if 'video_infos' not in st.session_state: | |
| st.session_state.video_infos = [] # 複数動画の情報 | |
| if 'rois' not in st.session_state: | |
| st.session_state.rois = [] | |
| if 'roi_labels' not in st.session_state: | |
| st.session_state.roi_labels = [] | |
| if 'ocr_processor' not in st.session_state: | |
| st.session_state.ocr_processor = OCRProcessor() | |
| if 'extracted_data' not in st.session_state: | |
| st.session_state.extracted_data = None | |
| if 'batch_extracted_data' not in st.session_state: | |
| st.session_state.batch_extracted_data = {} # {video_path: dataframe} | |
| if 'current_step' not in st.session_state: | |
| st.session_state.current_step = 1 | |
| if 'batch_mode' not in st.session_state: | |
| st.session_state.batch_mode = False | |
| def main(): | |
| """メイン関数""" | |
| init_session_state() | |
| # タイトル | |
| st.title("📹 Video OCR Extractor") | |
| st.markdown("動画から文字データを抽出してCSVに出力するツール") | |
| # サイドバー - ステップ表示 | |
| with st.sidebar: | |
| st.header("処理ステップ") | |
| steps = [ | |
| "📹 Step 1: 動画アップロード", | |
| "📍 Step 2: ROI選択", | |
| "🔍 Step 3: データ抽出", | |
| "📊 Step 4: 結果確認", | |
| "📂 Step 5: ダウンロード" | |
| ] | |
| for i, step in enumerate(steps, 1): | |
| if i == st.session_state.current_step: | |
| st.markdown(f"**→ {step}**") | |
| elif i < st.session_state.current_step: | |
| st.markdown(f"✅ {step}") | |
| else: | |
| st.markdown(f"⚪ {step}") | |
| st.markdown("---") | |
| if st.button("🔄 最初からやり直す"): | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] | |
| st.rerun() | |
| # Step 1: 動画アップロード | |
| if st.session_state.current_step == 1: | |
| step1_video_upload() | |
| # Step 2: ROI選択 | |
| elif st.session_state.current_step == 2: | |
| step2_roi_selection() | |
| # Step 3: データ抽出 | |
| elif st.session_state.current_step == 3: | |
| step3_data_extraction() | |
| # Step 4: 結果確認 | |
| elif st.session_state.current_step == 4: | |
| step4_result_confirmation() | |
| # Step 5: ダウンロード | |
| elif st.session_state.current_step == 5: | |
| step5_download() | |
| def step1_video_upload(): | |
| """Step 1: 動画アップロード""" | |
| st.header("📹 Step 1: 動画アップロード") | |
| # 処理モード選択 | |
| mode = st.radio( | |
| "処理モード", | |
| ["単一動画", "複数動画(バッチ処理)"], | |
| help="複数動画を選択すると、同じROIで全ての動画を処理できます" | |
| ) | |
| batch_mode = (mode == "複数動画(バッチ処理)") | |
| st.session_state.batch_mode = batch_mode | |
| if batch_mode: | |
| st.info("📦 複数の動画を同じROIで一括処理します") | |
| step1_batch_upload() | |
| else: | |
| st.info("🎬 1つの動画を処理します") | |
| step1_single_upload() | |
| def step1_single_upload(): | |
| """単一動画のアップロード""" | |
| # ファイルアップローダー | |
| uploaded_file = st.file_uploader( | |
| "動画ファイルを選択", | |
| type=['mp4', 'avi', 'mov', 'mkv'], | |
| key='video_uploader' | |
| ) | |
| # またはローカルパス指定 | |
| st.markdown("**または**") | |
| local_path = st.text_input( | |
| "ローカルファイルパスを指定", | |
| placeholder="/path/to/video.mp4" | |
| ) | |
| if uploaded_file is not None: | |
| # 一時ファイルに保存 | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file: | |
| tmp_file.write(uploaded_file.read()) | |
| video_path = tmp_file.name | |
| process_video(video_path, uploaded_file.name) | |
| elif local_path and os.path.exists(local_path): | |
| if st.button("この動画を使用"): | |
| process_video(local_path, os.path.basename(local_path)) | |
| def step1_batch_upload(): | |
| """複数動画のバッチアップロード""" | |
| st.markdown("### 方法1: ファイルアップロード") | |
| # 複数ファイルアップローダー | |
| uploaded_files = st.file_uploader( | |
| "動画ファイルを選択(複数可)", | |
| type=['mp4', 'avi', 'mov', 'mkv'], | |
| accept_multiple_files=True, | |
| key='batch_video_uploader' | |
| ) | |
| st.markdown("### 方法2: ローカルパス指定") | |
| st.markdown("複数のパスを改行で区切って入力してください") | |
| local_paths_text = st.text_area( | |
| "ローカルファイルパス(1行に1ファイル)", | |
| placeholder="/path/to/video1.mp4\n/path/to/video2.mp4\n/path/to/video3.mp4", | |
| height=150 | |
| ) | |
| video_paths = [] | |
| video_names = [] | |
| # アップロードされたファイルを処理 | |
| if uploaded_files: | |
| for uploaded_file in uploaded_files: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file: | |
| tmp_file.write(uploaded_file.read()) | |
| video_paths.append(tmp_file.name) | |
| video_names.append(uploaded_file.name) | |
| # ローカルパスを処理 | |
| if local_paths_text: | |
| paths = [p.strip() for p in local_paths_text.split('\n') if p.strip()] | |
| for path in paths: | |
| if os.path.exists(path): | |
| video_paths.append(path) | |
| video_names.append(os.path.basename(path)) | |
| else: | |
| st.warning(f"⚠️ ファイルが見つかりません: {path}") | |
| if video_paths: | |
| st.success(f"✅ {len(video_paths)}個の動画を選択しました") | |
| # 動画リストを表示 | |
| for i, (path, name) in enumerate(zip(video_paths, video_names)): | |
| st.text(f"{i+1}. {name}") | |
| if st.button("これらの動画を使用", type="primary"): | |
| process_batch_videos(video_paths, video_names) | |
| def process_video(video_path, video_name): | |
| """動画を処理""" | |
| st.session_state.video_path = video_path | |
| # 動画情報を取得 | |
| video_info = VideoProcessor.load_video_info(video_path) | |
| if video_info is None: | |
| st.error("動画を読み込めませんでした") | |
| return | |
| st.session_state.video_info = video_info | |
| # 動画情報を表示 | |
| st.success(f"✅ 動画を読み込みました: {video_name}") | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("解像度", f"{video_info['width']}x{video_info['height']}") | |
| with col2: | |
| st.metric("FPS", f"{video_info['fps']:.2f}") | |
| with col3: | |
| st.metric("総フレーム数", f"{video_info['total_frames']:,}") | |
| with col4: | |
| st.metric("長さ", f"{video_info['duration']:.1f}秒") | |
| # プレビュー(2分後のフレーム) | |
| preview_time = 120 # 2分 = 120秒 | |
| preview_frame_number = int(video_info['fps'] * preview_time) | |
| # 動画の長さより長い場合は中間のフレームを使用 | |
| if preview_frame_number >= video_info['total_frames']: | |
| preview_frame_number = video_info['total_frames'] // 2 | |
| preview_time = preview_frame_number / video_info['fps'] | |
| frame = VideoProcessor.get_frame(video_path, preview_frame_number) | |
| if frame is not None: | |
| st.image(frame, caption=f"プレビュー({preview_time:.1f}秒後のフレーム)", use_container_width=True) | |
| # 次のステップへ | |
| if st.button("次へ:ROI選択 →", type="primary"): | |
| st.session_state.current_step = 2 | |
| st.rerun() | |
| def process_batch_videos(video_paths, video_names): | |
| """複数動画を処理""" | |
| st.session_state.video_paths = video_paths | |
| st.session_state.video_names = video_names | |
| # 各動画の情報を取得 | |
| video_infos = [] | |
| for path in video_paths: | |
| video_info = VideoProcessor.load_video_info(path) | |
| if video_info: | |
| video_infos.append(video_info) | |
| else: | |
| st.error(f"❌ 動画を読み込めませんでした: {path}") | |
| return | |
| st.session_state.video_infos = video_infos | |
| # 最初の動画をプレビュー用に設定 | |
| st.session_state.video_path = video_paths[0] | |
| st.session_state.video_info = video_infos[0] | |
| st.success(f"✅ {len(video_paths)}個の動画を読み込みました") | |
| # 動画情報の概要を表示 | |
| st.subheader("動画一覧") | |
| for i, (name, info) in enumerate(zip(video_names, video_infos)): | |
| col1, col2, col3, col4 = st.columns([2, 1, 1, 1]) | |
| with col1: | |
| st.text(f"{i+1}. {name}") | |
| with col2: | |
| st.text(f"{info['width']}x{info['height']}") | |
| with col3: | |
| st.text(f"{info['fps']:.1f} fps") | |
| with col4: | |
| st.text(f"{info['duration']:.1f}s") | |
| # 最初の動画のプレビュー | |
| st.subheader("プレビュー(最初の動画)") | |
| preview_time = 120 | |
| preview_frame_number = int(video_infos[0]['fps'] * preview_time) | |
| if preview_frame_number >= video_infos[0]['total_frames']: | |
| preview_frame_number = video_infos[0]['total_frames'] // 2 | |
| preview_time = preview_frame_number / video_infos[0]['fps'] | |
| frame = VideoProcessor.get_frame(video_paths[0], preview_frame_number) | |
| if frame is not None: | |
| st.image(frame, caption=f"{video_names[0]} - {preview_time:.1f}秒後", use_container_width=True) | |
| # 次のステップへ | |
| if st.button("次へ:ROI選択 →", type="primary"): | |
| st.session_state.current_step = 2 | |
| st.rerun() | |
| def step2_roi_selection(): | |
| """Step 2: ROI選択""" | |
| st.header("📍 Step 2: ROI選択") | |
| video_path = st.session_state.video_path | |
| video_info = st.session_state.video_info | |
| # 選択モードの説明 | |
| st.info("🎯 **ROIを選択する方法を選んでください**") | |
| # タブで2つの方法を提供 | |
| tab1, tab2 = st.tabs(["🖱️ マウスで選択(推奨)", "⌨️ 座標を入力"]) | |
| with tab1: | |
| roi_selection_with_mouse() | |
| with tab2: | |
| roi_selection_with_input() | |
| def roi_selection_with_mouse(): | |
| """マウスクリックによるROI選択(アプリ内)""" | |
| video_path = st.session_state.video_path | |
| video_info = st.session_state.video_info | |
| st.markdown("### 🖱️ マウスでROIを選択") | |
| # 使い方の説明 | |
| with st.expander("📖 使い方", expanded=False): | |
| st.markdown(""" | |
| **ROI選択の手順:** | |
| 1. スライダーでデータが表示されているフレームを選択 | |
| 2. 画像上で **左上の角** をクリック | |
| 3. 次に **右下の角** をクリックすると矩形が描画されます | |
| 4. ラベルを入力して「ROIを追加」ボタンをクリック | |
| 5. 複数のROIを選択できます | |
| 6. すべて選択したら「次へ:データ抽出 →」をクリック | |
| **Tips:** | |
| - ROIは正確に選択する必要はありません。データが含まれていればOKです | |
| - 選択をやり直す場合は、「選択をリセット」ボタンをクリックしてください | |
| """) | |
| # フレーム選択 | |
| default_time = 120 # 2分 = 120秒 | |
| default_frame = int(video_info['fps'] * default_time) | |
| if default_frame >= video_info['total_frames']: | |
| default_frame = video_info['total_frames'] // 2 | |
| frame_number = st.slider( | |
| "プレビューフレームを選択", | |
| 0, | |
| video_info['total_frames'] - 1, | |
| default_frame, | |
| help="データが表示されているフレームを選択してください(デフォルト: 2分後)", | |
| key="mouse_frame_slider" | |
| ) | |
| # フレームを取得 (キャッシュ使用) | |
| if 'cached_frame' not in st.session_state or st.session_state.get('cached_frame_number') != frame_number: | |
| frame = VideoProcessor.get_frame(video_path, frame_number) | |
| if frame is None: | |
| st.error("フレームを読み込めませんでした") | |
| return | |
| st.session_state.cached_frame = frame | |
| st.session_state.cached_frame_number = frame_number | |
| else: | |
| frame = st.session_state.cached_frame | |
| # セッションステートの初期化 | |
| if 'mouse_selected_rois' not in st.session_state: | |
| st.session_state.mouse_selected_rois = [] | |
| if 'mouse_roi_labels' not in st.session_state: | |
| st.session_state.mouse_roi_labels = [] | |
| if 'mouse_click_points' not in st.session_state: | |
| st.session_state.mouse_click_points = [] | |
| if 'last_click' not in st.session_state: | |
| st.session_state.last_click = None | |
| if 'roi_reset_id' not in st.session_state: | |
| st.session_state.roi_reset_id = 0 | |
| # 既存のROIを画像に描画 | |
| display_frame = frame.copy() | |
| if len(st.session_state.mouse_selected_rois) > 0: | |
| pil_image = Image.fromarray(display_frame) # RGBのまま処理 | |
| draw = ImageDraw.Draw(pil_image) | |
| for roi, label in zip(st.session_state.mouse_selected_rois, st.session_state.mouse_roi_labels): | |
| x, y, w, h = roi | |
| draw.rectangle([x, y, x+w, y+h], outline="green", width=3) | |
| draw.text((x, y-20), label, fill="green") | |
| display_frame = np.array(pil_image) # RGBのまま戻す | |
| # 現在選択中の点・矩形を描画 | |
| if len(st.session_state.mouse_click_points) == 1: | |
| pil_image = Image.fromarray(display_frame) # RGB | |
| draw = ImageDraw.Draw(pil_image) | |
| x, y = st.session_state.mouse_click_points[0] | |
| draw.ellipse([x-5, y-5, x+5, y+5], fill="red", outline="red") | |
| draw.text((x+10, y), "左上", fill="red") | |
| display_frame = np.array(pil_image) # RGB | |
| elif len(st.session_state.mouse_click_points) == 2 and 'temp_roi' in st.session_state: | |
| # 選択中の矩形を描画 | |
| pil_image = Image.fromarray(display_frame) # RGB | |
| draw = ImageDraw.Draw(pil_image) | |
| x, y, w, h = st.session_state.temp_roi | |
| draw.rectangle([x, y, x+w, y+h], outline="yellow", width=3) | |
| draw.text((x, y-20), "選択中", fill="yellow") | |
| display_frame = np.array(pil_image) # RGB | |
| # 画像を表示してクリック座標を取得 | |
| st.markdown("### 📸 画像をクリックしてROIを選択") | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| # RGB形式なのでそのまま表示 | |
| display_frame_rgb = display_frame | |
| # クリック可能な画像を表示 | |
| value = streamlit_image_coordinates( | |
| Image.fromarray(display_frame_rgb), | |
| key=f"roi_selector_{st.session_state.roi_reset_id}" | |
| ) | |
| # クリックされた座標を処理 | |
| if value is not None: | |
| clicked_x = value["x"] | |
| clicked_y = value["y"] | |
| current_click = (clicked_x, clicked_y) | |
| # 前回と同じクリックの場合はスキップ | |
| if current_click == st.session_state.last_click: | |
| pass # 同じクリックなので何もしない | |
| else: | |
| # 新しいクリック | |
| st.session_state.last_click = current_click | |
| if len(st.session_state.mouse_click_points) == 0: | |
| # 1点目(左上) | |
| st.session_state.mouse_click_points = [(clicked_x, clicked_y)] | |
| st.info(f"✅ 左上の角を選択しました: ({clicked_x}, {clicked_y})") | |
| st.info("👉 次に右下の角をクリックしてください") | |
| st.rerun() | |
| elif len(st.session_state.mouse_click_points) == 1: | |
| # 2点目(右下) | |
| x1, y1 = st.session_state.mouse_click_points[0] | |
| x2, y2 = clicked_x, clicked_y | |
| # 座標を正規化(左上が小さい値になるように) | |
| x = min(x1, x2) | |
| y = min(y1, y2) | |
| w = abs(x2 - x1) | |
| h = abs(y2 - y1) | |
| # 一時的に保存 | |
| st.session_state.temp_roi = (x, y, w, h) | |
| st.session_state.mouse_click_points.append((clicked_x, clicked_y)) | |
| st.success(f"✅ 矩形を選択しました: x={x}, y={y}, width={w}, height={h}") | |
| st.rerun() | |
| with col2: | |
| st.markdown("#### 選択状況") | |
| if len(st.session_state.mouse_click_points) == 0: | |
| st.info("👆 画像の左上の角をクリックしてください") | |
| elif len(st.session_state.mouse_click_points) == 1: | |
| x, y = st.session_state.mouse_click_points[0] | |
| st.success(f"左上: ({x}, {y})") | |
| st.info("👆 右下の角をクリックしてください") | |
| elif len(st.session_state.mouse_click_points) == 2: | |
| st.success("矩形を選択しました!") | |
| st.info("👇 ラベルを入力して追加してください") | |
| st.markdown("---") | |
| st.markdown(f"**登録済みROI: {len(st.session_state.mouse_selected_rois)}個**") | |
| for i, label in enumerate(st.session_state.mouse_roi_labels): | |
| st.text(f"{i+1}. {label}") | |
| # ROIが選択された場合、ラベル入力とROI追加 | |
| if len(st.session_state.mouse_click_points) == 2 and 'temp_roi' in st.session_state: | |
| st.markdown("---") | |
| st.markdown("### ✏️ ROIにラベルを付ける") | |
| # デフォルトラベル | |
| default_labels = ["Time", "Speed", "Altitude", "Stage", "Velocity", "Apogee"] | |
| roi_index = len(st.session_state.mouse_selected_rois) | |
| default_label = default_labels[roi_index] if roi_index < len(default_labels) else f"ROI_{roi_index + 1}" | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| label_input = st.text_input( | |
| "ラベル名", | |
| value=default_label, | |
| key="mouse_label_input" | |
| ) | |
| with col2: | |
| if st.button("ROIを追加", type="primary", key="add_roi_btn"): | |
| if label_input: | |
| st.session_state.mouse_selected_rois.append(st.session_state.temp_roi) | |
| st.session_state.mouse_roi_labels.append(label_input) | |
| # リセット | |
| del st.session_state.temp_roi | |
| st.session_state.mouse_click_points = [] | |
| st.session_state.last_click = None | |
| st.session_state.roi_reset_id += 1 # コンポーネントの状態をリセット | |
| st.success(f"✅ ROI '{label_input}' を追加しました!") | |
| st.rerun() | |
| # プレビュー表示 | |
| x, y, w, h = st.session_state.temp_roi | |
| if w > 0 and h > 0: | |
| roi_preview = frame[y:y+h, x:x+w] | |
| # RGBなのでそのまま表示 | |
| st.image(roi_preview, caption=f"プレビュー: {label_input}", width=400) | |
| # 選択をリセット | |
| if st.button("選択をリセット", key="reset_selection"): | |
| st.session_state.mouse_click_points = [] | |
| st.session_state.last_click = None | |
| if 'temp_roi' in st.session_state: | |
| del st.session_state.temp_roi | |
| st.session_state.roi_reset_id += 1 # コンポーネントの状態をリセット | |
| st.rerun() | |
| # 選択済みROIのプレビュー | |
| if len(st.session_state.mouse_selected_rois) > 0: | |
| st.markdown("---") | |
| st.markdown("### 📋 選択済みROI一覧") | |
| cols = st.columns(min(3, len(st.session_state.mouse_selected_rois))) | |
| for i, (roi, label) in enumerate(zip(st.session_state.mouse_selected_rois, st.session_state.mouse_roi_labels)): | |
| x, y, w, h = roi | |
| if w > 0 and h > 0: | |
| roi_preview = frame[y:y+h, x:x+w] | |
| roi_preview_rgb = roi_preview | |
| with cols[i % 3]: | |
| st.image(roi_preview_rgb, caption=f"{i+1}. {label}\n({x},{y},{w},{h})", use_container_width=True) | |
| # 最後のROIを削除 | |
| if st.button("❌ 最後のROIを削除", key="delete_last_roi"): | |
| st.session_state.mouse_selected_rois.pop() | |
| st.session_state.mouse_roi_labels.pop() | |
| st.rerun() | |
| # 全てクリア | |
| if st.button("🗑️ 全てクリア", key="clear_all_rois"): | |
| st.session_state.mouse_selected_rois = [] | |
| st.session_state.mouse_roi_labels = [] | |
| st.session_state.mouse_click_points = [] | |
| st.session_state.last_click = None | |
| if 'temp_roi' in st.session_state: | |
| del st.session_state.temp_roi | |
| st.session_state.roi_reset_id += 1 # コンポーネントの状態をリセット | |
| st.rerun() | |
| # ROIを保存して次へ | |
| if len(st.session_state.mouse_selected_rois) > 0: | |
| st.session_state.rois = st.session_state.mouse_selected_rois | |
| st.session_state.roi_labels = st.session_state.mouse_roi_labels | |
| st.markdown("---") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("← 戻る", key="mouse_back"): | |
| st.session_state.current_step = 1 | |
| st.rerun() | |
| with col2: | |
| if st.button("次へ:データ抽出 →", type="primary", key="mouse_next"): | |
| st.session_state.current_step = 3 | |
| st.rerun() | |
| else: | |
| if st.button("← 戻る", key="mouse_back_only"): | |
| st.session_state.current_step = 1 | |
| st.rerun() | |
| def roi_selection_with_input(): | |
| """座標入力によるROI選択(従来の方法)""" | |
| video_path = st.session_state.video_path | |
| video_info = st.session_state.video_info | |
| st.info("座標を手動で入力してROIを設定します") | |
| # フレーム選択 | |
| # デフォルトで2分後のフレームを使用 | |
| default_time = 120 # 2分 = 120秒 | |
| default_frame = int(video_info['fps'] * default_time) | |
| # 動画の長さより長い場合は中間のフレームを使用 | |
| if default_frame >= video_info['total_frames']: | |
| default_frame = video_info['total_frames'] // 2 | |
| frame_number = st.slider( | |
| "プレビューフレームを選択", | |
| 0, | |
| video_info['total_frames'] - 1, | |
| default_frame, | |
| help="データが表示されているフレームを選択してください(デフォルト: 2分後)", | |
| key="input_frame_slider" | |
| ) | |
| # フレームを取得 | |
| frame = VideoProcessor.get_frame(video_path, frame_number) | |
| if frame is None: | |
| st.error("フレームを読み込めませんでした") | |
| return | |
| # フレームを表示 | |
| st.image(frame, caption=f"フレーム {frame_number}", use_container_width=True) | |
| # 1080p動画の参考設定を表示 | |
| if video_info['width'] == 1920 and video_info['height'] == 1080: | |
| with st.expander("💡 参考:ロケット動画の1080p用ROI設定例"): | |
| st.code(""" | |
| ROI 1 (Time): 80,130,120,40 | |
| ROI 2 (Speed): 1110,125,90,40 | |
| ROI 3 (Altitude): 1215,125,90,40 | |
| """) | |
| # ROI入力 | |
| st.subheader("ROI設定") | |
| num_rois = st.number_input("ROIの数", min_value=1, max_value=10, value=3, key="input_num_rois") | |
| rois = [] | |
| labels = [] | |
| for i in range(num_rois): | |
| st.markdown(f"**ROI {i+1}**") | |
| col1, col2 = st.columns(2) | |
| # 推奨ラベル | |
| default_labels = ["Time", "Speed", "Altitude", "Stage", "Velocity", "Apogee"] | |
| default_label = default_labels[i] if i < len(default_labels) else f"Telemetry_{i+1}" | |
| with col1: | |
| label = st.text_input( | |
| f"ラベル", | |
| value=default_label, | |
| key=f"input_label_{i}" | |
| ) | |
| with col2: | |
| roi_input = st.text_input( | |
| f"座標 (x,y,width,height)", | |
| placeholder="例: 1100,100,150,80", | |
| key=f"input_roi_{i}", | |
| help="画像上の左上座標(x,y)と幅、高さを指定" | |
| ) | |
| if roi_input: | |
| try: | |
| parts = [int(p.strip()) for p in roi_input.split(',')] | |
| if len(parts) == 4: | |
| rois.append(tuple(parts)) | |
| labels.append(label) | |
| # ROIをプレビュー | |
| x, y, w, h = parts | |
| if w > 0 and h > 0: | |
| roi_preview = frame[y:y+h, x:x+w] | |
| roi_preview_rgb = roi_preview | |
| st.image(roi_preview_rgb, caption=f"{label} プレビュー", width=300) | |
| except: | |
| st.warning(f"ROI {i+1}: 正しい形式で入力してください(例: 100,200,150,80)") | |
| # ROIを保存 | |
| if len(rois) > 0: | |
| st.session_state.rois = rois | |
| st.session_state.roi_labels = labels | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("← 戻る", key="input_back"): | |
| st.session_state.current_step = 1 | |
| st.rerun() | |
| with col2: | |
| if st.button("次へ:データ抽出 →", type="primary", key="input_next"): | |
| st.session_state.current_step = 3 | |
| st.rerun() | |
| else: | |
| if st.button("← 戻る", key="input_back_only"): | |
| st.session_state.current_step = 1 | |
| st.rerun() | |
| def step3_data_extraction(): | |
| """Step 3: データ抽出""" | |
| st.header("🔍 Step 3: データ抽出") | |
| # バッチモードかチェック | |
| batch_mode = st.session_state.get('batch_mode', False) | |
| if batch_mode: | |
| step3_batch_extraction() | |
| else: | |
| step3_single_extraction() | |
| def step3_single_extraction(): | |
| """単一動画のデータ抽出""" | |
| video_path = st.session_state.video_path | |
| video_info = st.session_state.video_info | |
| rois = st.session_state.rois | |
| labels = st.session_state.roi_labels | |
| # データが既に抽出済みかチェック | |
| if 'extracted_data' in st.session_state and st.session_state.extracted_data is not None: | |
| # 抽出完了状態 | |
| df = st.session_state.extracted_data | |
| st.success(f"✅ データ抽出が完了しました! {len(df)}レコードを抽出") | |
| # プレビュー表示 | |
| st.subheader("データプレビュー") | |
| st.dataframe(df.head(10), use_container_width=True) | |
| # ナビゲーションボタン | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if st.button("← ROI選択に戻る", key="back_to_roi"): | |
| st.session_state.current_step = 2 | |
| # extracted_dataをクリア | |
| st.session_state.extracted_data = None | |
| st.rerun() | |
| with col2: | |
| if st.button("🔄 再抽出", key="re_extract"): | |
| # extracted_dataをクリア | |
| st.session_state.extracted_data = None | |
| st.rerun() | |
| with col3: | |
| if st.button("次へ:結果確認 →", type="primary", key="goto_results"): | |
| st.session_state.current_step = 4 | |
| st.rerun() | |
| return | |
| # 抽出設定 | |
| st.subheader("抽出設定") | |
| extraction_mode = st.radio( | |
| "抽出モード", | |
| ["時間間隔", "フレーム間隔"], | |
| horizontal=True, | |
| help="時間間隔: 何秒ごとに抽出 / フレーム間隔: 何フレームごとに抽出" | |
| ) | |
| if extraction_mode == "時間間隔": | |
| interval = st.slider( | |
| "抽出間隔(秒)", | |
| min_value=0.5, | |
| max_value=5.0, | |
| value=1.0, | |
| step=0.5, | |
| help="何秒ごとにデータを抽出するか" | |
| ) | |
| frame_interval = int(video_info['fps'] * interval) | |
| else: # フレーム間隔 | |
| frame_interval = st.slider( | |
| "抽出間隔(フレーム)", | |
| min_value=1, | |
| max_value=int(video_info['fps'] * 5), # 最大5秒分 | |
| value=1, | |
| step=1, | |
| help="何フレームごとにデータを抽出するか(1=全フレーム)" | |
| ) | |
| interval = frame_interval / video_info['fps'] | |
| st.info(f"📊 設定: {frame_interval}フレームごと(約{interval:.2f}秒間隔)") | |
| # ROI情報を表示 | |
| st.subheader("ROI確認") | |
| for i, (roi, label) in enumerate(zip(rois, labels)): | |
| st.text(f"{label}: x={roi[0]}, y={roi[1]}, w={roi[2]}, h={roi[3]}") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("← 戻る", key="back_from_extraction"): | |
| st.session_state.current_step = 2 | |
| st.rerun() | |
| with col2: | |
| start_extraction = st.button("データ抽出を開始", type="primary", key="start_extraction") | |
| if start_extraction: | |
| # OCR初期化 | |
| ocr_processor = st.session_state.ocr_processor | |
| ocr_processor.initialize() | |
| # データ抽出 | |
| cap = cv2.VideoCapture(video_path) | |
| fps = video_info['fps'] | |
| total_frames = video_info['total_frames'] | |
| # frame_intervalは既に上で計算済み | |
| data = [] | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| frame_count = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # 指定間隔でのみ処理 | |
| if frame_count % frame_interval == 0: | |
| timestamp = frame_count / fps | |
| # BGRからRGBに変換 | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # 各ROIに対してOCRを実行 | |
| row_data = {} | |
| for i, (roi, label) in enumerate(zip(rois, labels)): | |
| value, raw_text = ocr_processor.process_roi(frame_rgb, roi, label) | |
| row_data[f'{label}'] = value | |
| row_data[f'{label}_raw'] = raw_text | |
| data.append(row_data) | |
| # 進捗更新 (描画負荷軽減のため10レコードごとに更新) | |
| if len(data) % 10 == 0 or frame_count + frame_interval >= total_frames: | |
| progress = frame_count / total_frames | |
| progress_bar.progress(progress) | |
| status_text.text(f"進捗: {progress*100:.1f}% ({len(data)}レコード抽出)") | |
| frame_count += 1 | |
| cap.release() | |
| # DataFrameに変換 | |
| df = pd.DataFrame(data) | |
| # 後処理: 時間の外れ値補正 | |
| if 'Time' in df.columns: | |
| df = PostProcessor.correct_time_outliers(df, 'Time') | |
| st.session_state.extracted_data = df | |
| progress_bar.progress(1.0) | |
| status_text.text(f"✅ 完了: {len(df)}レコードを抽出しました") | |
| st.success(f"データ抽出が完了しました! {len(df)}レコードを抽出") | |
| st.info("🔄 ページが自動的に更新されます...") | |
| # 自動的に再描画 | |
| st.rerun() | |
| def step3_batch_extraction(): | |
| """複数動画のバッチデータ抽出""" | |
| video_paths = st.session_state.video_paths | |
| video_names = st.session_state.video_names | |
| video_infos = st.session_state.video_infos | |
| rois = st.session_state.rois | |
| labels = st.session_state.roi_labels | |
| # バッチ抽出済みかチェック | |
| if st.session_state.batch_extracted_data and len(st.session_state.batch_extracted_data) == len(video_paths): | |
| st.success(f"✅ バッチ抽出が完了しました! {len(video_paths)}個の動画を処理") | |
| # 結果サマリーを表示 | |
| st.subheader("抽出結果サマリー") | |
| for i, (name, path) in enumerate(zip(video_names, video_paths)): | |
| df = st.session_state.batch_extracted_data.get(path) | |
| if df is not None: | |
| col1, col2, col3 = st.columns([3, 1, 1]) | |
| with col1: | |
| st.text(f"{i+1}. {name}") | |
| with col2: | |
| st.text(f"{len(df)} レコード") | |
| with col3: | |
| if 'Time' in df.columns: | |
| st.text(f"{df['Time'].max():.1f}s") | |
| else: | |
| st.text("-") | |
| # ナビゲーションボタン | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if st.button("← ROI選択に戻る", key="batch_back_to_roi"): | |
| st.session_state.current_step = 2 | |
| st.session_state.batch_extracted_data = {} | |
| st.rerun() | |
| with col2: | |
| if st.button("🔄 再抽出", key="batch_re_extract"): | |
| st.session_state.batch_extracted_data = {} | |
| st.rerun() | |
| with col3: | |
| if st.button("次へ:結果確認 →", type="primary", key="batch_goto_results"): | |
| st.session_state.current_step = 4 | |
| st.rerun() | |
| return | |
| # 抽出設定 | |
| st.subheader("抽出設定") | |
| extraction_mode = st.radio( | |
| "抽出モード", | |
| ["時間間隔", "フレーム間隔"], | |
| horizontal=True, | |
| help="時間間隔: 何秒ごとに抽出 / フレーム間隔: 何フレームごとに抽出", | |
| key="batch_extraction_mode" | |
| ) | |
| # 最初の動画のFPSを参考に使用(全動画で同じと仮定) | |
| ref_fps = video_infos[0]['fps'] | |
| if extraction_mode == "時間間隔": | |
| interval = st.slider( | |
| "抽出間隔(秒)", | |
| min_value=0.5, | |
| max_value=5.0, | |
| value=1.0, | |
| step=0.5, | |
| help="何秒ごとにデータを抽出するか", | |
| key="batch_interval_slider" | |
| ) | |
| frame_interval = int(ref_fps * interval) | |
| else: # フレーム間隔 | |
| frame_interval = st.slider( | |
| "抽出間隔(フレーム)", | |
| min_value=1, | |
| max_value=int(ref_fps * 5), # 最大5秒分 | |
| value=1, | |
| step=1, | |
| help="何フレームごとにデータを抽出するか(1=全フレーム)", | |
| key="batch_frame_interval_slider" | |
| ) | |
| interval = frame_interval / ref_fps | |
| st.info(f"📊 設定: {frame_interval}フレームごと(約{interval:.2f}秒間隔)") | |
| # ROI情報を表示 | |
| st.subheader("バッチ処理確認") | |
| st.text(f"処理動画数: {len(video_paths)}") | |
| for i, (roi, label) in enumerate(zip(rois, labels)): | |
| st.text(f"{label}: x={roi[0]}, y={roi[1]}, w={roi[2]}, h={roi[3]}") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("← 戻る", key="batch_back_from_extraction"): | |
| st.session_state.current_step = 2 | |
| st.rerun() | |
| with col2: | |
| start_extraction = st.button("バッチ抽出を開始", type="primary", key="start_batch_extraction") | |
| if start_extraction: | |
| # OCR初期化 | |
| ocr_processor = st.session_state.ocr_processor | |
| ocr_processor.initialize() | |
| batch_data = {} | |
| # 全体の進捗バー | |
| overall_progress = st.progress(0) | |
| overall_status = st.empty() | |
| # 各動画を処理 | |
| for video_idx, (video_path, video_name, video_info) in enumerate(zip(video_paths, video_names, video_infos)): | |
| overall_status.text(f"📹 処理中: {video_name} ({video_idx+1}/{len(video_paths)})") | |
| # 動画ごとの進捗バー | |
| video_progress = st.progress(0) | |
| video_status = st.empty() | |
| # データ抽出 | |
| cap = cv2.VideoCapture(video_path) | |
| fps = video_info['fps'] | |
| total_frames = video_info['total_frames'] | |
| # 時間間隔モードの場合は動画ごとにframe_intervalを計算 | |
| # フレーム間隔モードの場合はframe_intervalを使用 | |
| if extraction_mode == "時間間隔": | |
| video_frame_interval = int(fps * interval) | |
| else: | |
| video_frame_interval = frame_interval | |
| data = [] | |
| frame_count = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # 指定間隔でのみ処理 | |
| if frame_count % video_frame_interval == 0: | |
| timestamp = frame_count / fps | |
| # BGRからRGBに変換 | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # 各ROIに対してOCRを実行 | |
| row_data = {} | |
| for i, (roi, label) in enumerate(zip(rois, labels)): | |
| value, raw_text = ocr_processor.process_roi(frame_rgb, roi, label) | |
| row_data[f'{label}'] = value | |
| row_data[f'{label}_raw'] = raw_text | |
| data.append(row_data) | |
| # 進捗更新 (10レコードごとに更新) | |
| if len(data) % 10 == 0 or frame_count + video_frame_interval >= total_frames: | |
| progress = frame_count / total_frames | |
| video_progress.progress(progress) | |
| video_status.text(f" 進捗: {progress*100:.1f}% ({len(data)}レコード)") | |
| frame_count += 1 | |
| cap.release() | |
| # DataFrameに変換して保存 | |
| df = pd.DataFrame(data) | |
| # 後処理: 時間の外れ値補正 | |
| if 'Time' in df.columns: | |
| df = PostProcessor.correct_time_outliers(df, 'Time') | |
| batch_data[video_path] = df | |
| video_progress.progress(1.0) | |
| video_status.text(f" ✅ 完了: {len(df)}レコード") | |
| # 全体の進捗更新 | |
| overall_progress.progress((video_idx + 1) / len(video_paths)) | |
| # バッチデータを保存 | |
| st.session_state.batch_extracted_data = batch_data | |
| overall_status.text(f"✅ 全{len(video_paths)}個の動画の処理が完了しました!") | |
| st.success(f"バッチ抽出が完了しました!") | |
| st.info("🔄 ページが自動的に更新されます...") | |
| # 自動的に再描画 | |
| st.rerun() | |
| def step4_result_confirmation(): | |
| """Step 4: 結果確認 (グラフ表示)""" | |
| st.header("📊 Step 4: 結果確認") | |
| batch_mode = st.session_state.get('batch_mode', False) | |
| if batch_mode: | |
| batch_data = st.session_state.batch_extracted_data | |
| video_names = st.session_state.video_names | |
| if not batch_data: | |
| st.error("抽出データがありません") | |
| return | |
| selected_video = st.selectbox("確認する動画を選択", video_names) | |
| video_idx = video_names.index(selected_video) | |
| video_paths = st.session_state.video_paths | |
| df = batch_data.get(video_paths[video_idx]) | |
| else: | |
| df = st.session_state.extracted_data | |
| if df is None or df.empty: | |
| st.error("抽出データがありません") | |
| return | |
| # データプレビュー | |
| st.subheader("データプレビュー") | |
| st.dataframe(df.head(10), use_container_width=True) | |
| # グラフ表示 | |
| if 'Time' in df.columns: | |
| st.subheader("テレメトリグラフ") | |
| # Speed グラフ | |
| speed_col = next((c for c in df.columns if 'speed' in c.lower()), None) | |
| if speed_col: | |
| st.markdown("### 🚀 Time vs Speed") | |
| st.line_chart(df.set_index('Time')[speed_col]) | |
| # Altitude グラフ | |
| alt_col = next((c for c in df.columns if 'altitude' in c.lower()), None) | |
| if alt_col: | |
| st.markdown("### 🏔️ Time vs Altitude") | |
| st.line_chart(df.set_index('Time')[alt_col]) | |
| else: | |
| st.warning("⚠️ 'Time' 列が見つからないため、グラフを表示できません") | |
| st.markdown("---") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("← ステップ3に戻る"): | |
| st.session_state.current_step = 3 | |
| st.rerun() | |
| with col2: | |
| if st.button("次へ:ダウンロード →", type="primary"): | |
| st.session_state.current_step = 5 | |
| st.rerun() | |
| def step5_download(): | |
| """Step 5: ダウンロード""" | |
| st.header("📂 Step 5: ダウンロード") | |
| batch_mode = st.session_state.get('batch_mode', False) | |
| if batch_mode: | |
| step5_batch_download() | |
| else: | |
| step5_single_download() | |
| def step5_single_download(): | |
| """単一動画のダウンロード""" | |
| df = st.session_state.extracted_data | |
| if df is None or df.empty: | |
| st.error("抽出データがありません") | |
| return | |
| # CSVダウンロード | |
| st.subheader("📥 CSVダウンロード") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| csv_filename = f"telemetry_{timestamp}.csv" | |
| csv_data = df.to_csv(index=False, encoding='utf-8-sig').encode('utf-8-sig') | |
| st.download_button( | |
| label="CSVをダウンロード", | |
| data=csv_data, | |
| file_name=csv_filename, | |
| mime="text/csv", | |
| type="primary" | |
| ) | |
| # グラフダウンロード (画像として) | |
| if 'Time' in df.columns: | |
| st.subheader("🖼️ グラフをダウンロード") | |
| # Speed グラフ | |
| speed_col = next((c for c in df.columns if 'speed' in c.lower()), None) | |
| if speed_col: | |
| fig_speed, ax_speed = plt.subplots(figsize=(10, 5)) | |
| ax_speed.plot(df['Time'], df[speed_col]) | |
| ax_speed.set_title('Time vs Speed') | |
| ax_speed.set_xlabel('Time (s)') | |
| ax_speed.set_ylabel('Speed') | |
| ax_speed.grid(True) | |
| buf_speed = io.BytesIO() | |
| fig_speed.savefig(buf_speed, format="png") | |
| st.download_button( | |
| label="Speedグラフを保存", | |
| data=buf_speed.getvalue(), | |
| file_name=f"speed_{timestamp}.png", | |
| mime="image/png" | |
| ) | |
| plt.close(fig_speed) | |
| # Altitude グラフ | |
| alt_col = next((c for c in df.columns if 'altitude' in c.lower()), None) | |
| if alt_col: | |
| fig_alt, ax_alt = plt.subplots(figsize=(10, 5)) | |
| ax_alt.plot(df['Time'], df[alt_col]) | |
| ax_alt.set_title('Time vs Altitude') | |
| ax_alt.set_xlabel('Time (s)') | |
| ax_alt.set_ylabel('Altitude') | |
| ax_alt.grid(True) | |
| buf_alt = io.BytesIO() | |
| fig_alt.savefig(buf_alt, format="png") | |
| st.download_button( | |
| label="Altitudeグラフを保存", | |
| data=buf_alt.getvalue(), | |
| file_name=f"altitude_{timestamp}.png", | |
| mime="image/png" | |
| ) | |
| plt.close(fig_alt) | |
| st.markdown("---") | |
| if st.button("🔄 最初からやり直す"): | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] | |
| st.rerun() | |
| def step5_batch_download(): | |
| """複数動画の一括ダウンロード""" | |
| batch_data = st.session_state.batch_extracted_data | |
| video_names = st.session_state.video_names | |
| video_paths = st.session_state.video_paths | |
| if not batch_data: | |
| st.error("抽出データがありません") | |
| return | |
| st.subheader("📦 ZIP形式で一括ダウンロード") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| zip_filename = f"batch_telemetry_{timestamp}.zip" | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for i, (path, df) in enumerate(batch_data.items()): | |
| v_name = os.path.splitext(video_names[i])[0] | |
| # CSVを追加 | |
| csv_str = df.to_csv(index=False, encoding='utf-8-sig') | |
| zip_file.writestr(f"{v_name}_{timestamp}.csv", csv_str) | |
| # グラフを追加 | |
| if 'Time' in df.columns: | |
| speed_col = next((c for c in df.columns if 'speed' in c.lower()), None) | |
| if speed_col: | |
| fig, ax = plt.subplots(figsize=(10, 5)) | |
| ax.plot(df['Time'], df[speed_col]) | |
| ax.set_title(f'Time vs Speed - {v_name}') | |
| ax.set_xlabel('Time (s)') | |
| ax.set_ylabel('Speed') | |
| ax.grid(True) | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format="png") | |
| zip_file.writestr(f"{v_name}_speed_{timestamp}.png", buf.getvalue()) | |
| plt.close(fig) | |
| alt_col = next((c for c in df.columns if 'altitude' in c.lower()), None) | |
| if alt_col: | |
| fig, ax = plt.subplots(figsize=(10, 5)) | |
| ax.plot(df['Time'], df[alt_col]) | |
| ax.set_title(f'Time vs Altitude - {v_name}') | |
| ax.set_xlabel('Time (s)') | |
| ax.set_ylabel('Altitude') | |
| ax.grid(True) | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format="png") | |
| zip_file.writestr(f"{v_name}_altitude_{timestamp}.png", buf.getvalue()) | |
| plt.close(fig) | |
| zip_buffer.seek(0) | |
| st.download_button( | |
| label="全データをZIPでダウンロード", | |
| data=zip_buffer.getvalue(), | |
| file_name=zip_filename, | |
| mime="application/zip", | |
| type="primary" | |
| ) | |
| st.markdown("---") | |
| if st.button("🔄 最初からやり直す", key="batch_restart"): | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] | |
| st.rerun() | |
| if __name__ == "__main__": | |
| main() | |