|
|
import gradio as gr |
|
|
import os |
|
|
from PIL import Image |
|
|
from utils.pose_utils import initialize_dwpose, safe_detect_pose |
|
|
from utils.notifications import notify_success, notify_error, NotificationMessages |
|
|
from utils.coordinate_system import update_coordinate_system |
|
|
from utils.image_processing import process_uploaded_image |
|
|
from utils.export_utils import export_pose_as_image, export_pose_as_json, get_timestamp_filename |
|
|
from utils.model_setup import download_models as _download_models_setup |
|
|
import json |
|
|
import tempfile |
|
|
import base64 |
|
|
import io |
|
|
import time |
|
|
|
|
|
|
|
|
_current_poses = None |
|
|
_current_frame_index = 0 |
|
|
_current_pose_data = None |
|
|
_is_updating = False |
|
|
|
|
|
def load_javascript(): |
|
|
"""JavaScriptファイルを読み込む""" |
|
|
try: |
|
|
js_path = os.path.join(os.path.dirname(__file__), "static", "pose_editor.js") |
|
|
with open(js_path, "r", encoding="utf-8") as f: |
|
|
js_content = f.read() |
|
|
return f"<script>{js_content}</script>" |
|
|
except FileNotFoundError: |
|
|
print(f"[ERROR] JavaScript file not found: {js_path}") |
|
|
return "<script>console.error('pose_editor.js not found');</script>" |
|
|
|
|
|
def image_to_base64(image): |
|
|
"""PIL画像をBase64データURLに変換""" |
|
|
if image is None: |
|
|
return None |
|
|
|
|
|
|
|
|
max_size = 640 |
|
|
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) |
|
|
|
|
|
|
|
|
buffer = io.BytesIO() |
|
|
image.save(buffer, format='PNG') |
|
|
img_str = base64.b64encode(buffer.getvalue()).decode() |
|
|
return f"data:image/png;base64,{img_str}" |
|
|
|
|
|
def main(): |
|
|
|
|
|
success, message = initialize_dwpose() |
|
|
if not success: |
|
|
print(f"警告: {message}") |
|
|
|
|
|
with gr.Blocks(title="Character OpenPose Editor", head=load_javascript()) as demo: |
|
|
gr.Markdown("# Character OpenPose Editor") |
|
|
gr.Markdown("2頭身・3頭身キャラクター向けのOpenPose・DWPoseのポーズ編集ツール") |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 入力") |
|
|
|
|
|
|
|
|
input_image = gr.Image( |
|
|
label="参考画像", |
|
|
type="pil", |
|
|
elem_id="input_image" |
|
|
) |
|
|
|
|
|
|
|
|
template_dropdown = gr.Dropdown( |
|
|
label="テンプレート", |
|
|
choices=["2頭身立ちポーズ", "3頭身立ちポーズ", "4頭身立ちポーズ", "5頭身立ちポーズ", "6頭身立ちポーズ", "7頭身立ちポーズ"], |
|
|
value="3頭身立ちポーズ" |
|
|
) |
|
|
|
|
|
|
|
|
template_update_btn = gr.Button( |
|
|
"🔄 テンプレートに更新", |
|
|
variant="secondary" |
|
|
) |
|
|
|
|
|
|
|
|
json_upload = gr.File( |
|
|
label="JSONファイルをロード", |
|
|
file_types=[".json"], |
|
|
elem_id="json_upload" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("### ポーズエディター") |
|
|
|
|
|
|
|
|
with gr.Row(equal_height=True): |
|
|
with gr.Column(scale=0, min_width=240): |
|
|
gr.Markdown("**表示設定**") |
|
|
with gr.Row(): |
|
|
draw_hand = gr.Checkbox(label="手を描画", value=True, container=False, scale=0, min_width=90) |
|
|
draw_face = gr.Checkbox(label="顔を描画", value=False, container=False, scale=0, min_width=90) |
|
|
with gr.Column(scale=1, min_width=160): |
|
|
gr.Markdown("**編集モード**") |
|
|
edit_mode = gr.Radio( |
|
|
choices=["簡易モード", "詳細モード"], |
|
|
value="簡易モード", |
|
|
container=False |
|
|
) |
|
|
|
|
|
|
|
|
pose_canvas = gr.HTML( |
|
|
elem_id="pose_canvas_container", |
|
|
value='<canvas id="pose_canvas" width="640" height="640" style="border: 1px solid #ccc;"></canvas>' |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
pose_data = gr.JSON(visible=False, value={}) |
|
|
|
|
|
|
|
|
js_pose_update = gr.Textbox(visible=False, elem_id="js_pose_update") |
|
|
|
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 出力") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
canvas_width = gr.Number( |
|
|
label="幅", |
|
|
value=512, |
|
|
minimum=64, |
|
|
maximum=2048, |
|
|
step=64, |
|
|
scale=1, |
|
|
min_width=100 |
|
|
) |
|
|
canvas_height = gr.Number( |
|
|
label="高さ", |
|
|
value=512, |
|
|
minimum=64, |
|
|
maximum=2048, |
|
|
step=64, |
|
|
scale=1, |
|
|
min_width=100 |
|
|
) |
|
|
|
|
|
canvas_update_btn = gr.Button( |
|
|
"画像サイズのUpdate", |
|
|
variant="secondary" |
|
|
) |
|
|
|
|
|
|
|
|
output_image = gr.Image( |
|
|
label="ポーズ画像", |
|
|
type="pil", |
|
|
elem_id="output_image", |
|
|
visible=False |
|
|
) |
|
|
|
|
|
|
|
|
download_image_btn = gr.Button( |
|
|
"📥 画像をダウンロード", |
|
|
variant="secondary" |
|
|
) |
|
|
download_image_file = gr.File( |
|
|
label="画像ファイル", |
|
|
visible=False |
|
|
) |
|
|
|
|
|
|
|
|
output_json = gr.JSON( |
|
|
label="ポーズデータ (JSON)", |
|
|
elem_id="output_json", |
|
|
visible=False |
|
|
) |
|
|
|
|
|
|
|
|
download_json_btn = gr.Button( |
|
|
"📥 JSONをダウンロード", |
|
|
variant="secondary" |
|
|
) |
|
|
download_json_file = gr.File( |
|
|
label="JSONファイル", |
|
|
visible=False |
|
|
) |
|
|
|
|
|
|
|
|
def on_image_upload(image): |
|
|
"""画像アップロード時のポーズ検出(refs互換・マルチフレーム管理)""" |
|
|
global _current_poses, _current_frame_index |
|
|
|
|
|
if image is None: |
|
|
|
|
|
|
|
|
return None, {}, gr.update(), gr.update(), gr.update() |
|
|
|
|
|
print(f"[DEBUG] 🖼️ Image upload detected: {type(image)}") |
|
|
|
|
|
|
|
|
processed_image, original_size, scale_info = process_uploaded_image(image) |
|
|
print(f"[DEBUG] 📐 Image processed: original_size={original_size}, scale_info={scale_info}") |
|
|
|
|
|
|
|
|
pose_result = safe_detect_pose(image) |
|
|
print(f"[DEBUG] 🤖 Pose detection result type: {type(pose_result)}") |
|
|
|
|
|
if pose_result is not None: |
|
|
print(f"[DEBUG] 📊 Pose result keys: {list(pose_result.keys()) if isinstance(pose_result, dict) else 'Not a dict'}") |
|
|
if isinstance(pose_result, dict) and 'bodies' in pose_result: |
|
|
bodies = pose_result['bodies'] |
|
|
if 'candidate' in bodies: |
|
|
candidates = bodies['candidate'] |
|
|
print(f"[DEBUG] 🎯 Candidates count: {len(candidates)}") |
|
|
print(f"[DEBUG] 📍 First 3 candidates: {candidates[:3] if len(candidates) >= 3 else candidates}") |
|
|
valid_count = len([c for c in candidates if c and len(c) >= 2 and c[0] > 0 and c[1] > 0]) |
|
|
zero_count = len([c for c in candidates if c and len(c) >= 2 and (c[0] == 0 or c[1] == 0)]) |
|
|
print(f"[DEBUG] ✅ Valid candidates: {valid_count}, 🚫 Zero coordinates: {zero_count}") |
|
|
|
|
|
|
|
|
if pose_result: |
|
|
|
|
|
person_data = { |
|
|
"pose_keypoints_2d": [], |
|
|
"hand_left_keypoints_2d": pose_result.get('hands', [[], []])[0] if pose_result.get('hands') else [], |
|
|
"hand_right_keypoints_2d": pose_result.get('hands', [[], []])[1] if pose_result.get('hands') and len(pose_result['hands']) > 1 else [], |
|
|
"face_keypoints_2d": pose_result.get('faces', [[]])[0] if pose_result.get('faces') else [] |
|
|
} |
|
|
|
|
|
|
|
|
if 'bodies' in pose_result and 'candidate' in pose_result['bodies']: |
|
|
candidates = pose_result['bodies']['candidate'] |
|
|
for candidate in candidates: |
|
|
if candidate and len(candidate) >= 2: |
|
|
person_data["pose_keypoints_2d"].extend([candidate[0], candidate[1], candidate[2] if len(candidate) > 2 else 1.0]) |
|
|
|
|
|
_current_poses = [{ |
|
|
'people': [person_data], |
|
|
'metadata': { |
|
|
'resolution': pose_result.get('resolution', [512, 512]) |
|
|
} |
|
|
}] |
|
|
_current_frame_index = 0 |
|
|
print(f"[DEBUG] ✅ グローバル変数更新完了(画像アップロード・refs互換)") |
|
|
|
|
|
|
|
|
image_base64 = image_to_base64(image) |
|
|
pose_result['background_image'] = image_base64 |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
data_w = int(original_size[0]) if original_size else 512 |
|
|
data_h = int(original_size[1]) if original_size else 512 |
|
|
|
|
|
new_w = max(64, min(2048, data_w)) |
|
|
new_h = max(64, min(2048, data_h)) |
|
|
|
|
|
|
|
|
except Exception: |
|
|
data_w, data_h = 512, 512 |
|
|
new_w = 512 |
|
|
new_h = 512 |
|
|
|
|
|
|
|
|
js_code = f"setTimeout(() => updateCanvasResolution({new_w}, {new_h}), 100);" |
|
|
|
|
|
return ( |
|
|
pose_result, |
|
|
pose_result, |
|
|
gr.update(value=f"<script>{js_code}</script>"), |
|
|
gr.update(value=new_w), |
|
|
gr.update(value=new_h) |
|
|
) |
|
|
else: |
|
|
print(f"[DEBUG] ❌ Pose detection failed") |
|
|
return None, {}, gr.update(), gr.update(), gr.update() |
|
|
|
|
|
def on_canvas_size_update(width, height): |
|
|
"""Canvas解像度更新""" |
|
|
global _current_poses, _current_frame_index |
|
|
try: |
|
|
width = int(width) if width else 512 |
|
|
height = int(height) if height else 512 |
|
|
|
|
|
|
|
|
width = max(64, min(2048, width)) |
|
|
height = max(64, min(2048, height)) |
|
|
|
|
|
|
|
|
update_coordinate_system((width, height), (640, 640)) |
|
|
|
|
|
|
|
|
try: |
|
|
if _current_poses and 0 <= _current_frame_index < len(_current_poses): |
|
|
_current_poses[_current_frame_index]['metadata']['resolution'] = [width, height] |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
js_code = f"updateCanvasResolution({width}, {height});" |
|
|
|
|
|
notify_success(f"Canvas解像度を{width}x{height}に更新しました") |
|
|
return gr.update(value=f"<script>{js_code}</script>") |
|
|
|
|
|
except Exception as e: |
|
|
notify_error(f"Canvas解像度更新に失敗しました: {str(e)}") |
|
|
return gr.update() |
|
|
|
|
|
def on_display_settings_change(draw_hand, draw_face, edit_mode): |
|
|
"""表示設定変更時(JavaScript側で直接監視するため、現在は使用されていません)""" |
|
|
|
|
|
js_code = f""" |
|
|
if (window.updateDisplaySettings) {{ |
|
|
window.updateDisplaySettings({str(draw_hand).lower()}, {str(draw_face).lower()}, '{edit_mode}'); |
|
|
}} |
|
|
""" |
|
|
return gr.update(value=f"<script>{js_code}</script>") |
|
|
|
|
|
def load_template_pose(template_name): |
|
|
"""テンプレートポーズを読み込み(refs互換・マルチフレーム管理)""" |
|
|
global _current_poses, _current_frame_index |
|
|
|
|
|
try: |
|
|
|
|
|
template_file_map = { |
|
|
"2頭身立ちポーズ": "2heads.json", |
|
|
"3頭身立ちポーズ": "3heads.json", |
|
|
"4頭身立ちポーズ": "4heads.json", |
|
|
"5頭身立ちポーズ": "5heads.json", |
|
|
"6頭身立ちポーズ": "6heads.json", |
|
|
"7頭身立ちポーズ": "7heads.json" |
|
|
} |
|
|
|
|
|
if template_name in template_file_map: |
|
|
templates_path = os.path.join(os.path.dirname(__file__), "poses", template_file_map[template_name]) |
|
|
with open(templates_path, "r", encoding="utf-8") as f: |
|
|
pose_data = json.load(f) |
|
|
else: |
|
|
|
|
|
templates_path = os.path.join(os.path.dirname(__file__), "templates", "poses.json") |
|
|
with open(templates_path, "r", encoding="utf-8") as f: |
|
|
templates = json.load(f) |
|
|
|
|
|
|
|
|
template_key_map = { |
|
|
"2頭身立ちポーズ": "2_head_standing", |
|
|
"2頭身座りポーズ": "2_head_sitting" |
|
|
} |
|
|
|
|
|
template_key = template_key_map.get(template_name) |
|
|
if template_key and template_key in templates["poses"]: |
|
|
pose_data = templates["poses"][template_key]["data"] |
|
|
else: |
|
|
notify_error("テンプレートが見つかりません") |
|
|
return None, {} |
|
|
|
|
|
if pose_data: |
|
|
|
|
|
if template_name in template_file_map: |
|
|
|
|
|
actual_pose_data = pose_data |
|
|
else: |
|
|
|
|
|
actual_pose_data = pose_data |
|
|
|
|
|
|
|
|
person_data = { |
|
|
"pose_keypoints_2d": [], |
|
|
"hand_left_keypoints_2d": actual_pose_data.get('hands', [[], []])[0] if actual_pose_data.get('hands') else [], |
|
|
"hand_right_keypoints_2d": actual_pose_data.get('hands', [[], []])[1] if actual_pose_data.get('hands') and len(actual_pose_data['hands']) > 1 else [], |
|
|
"face_keypoints_2d": actual_pose_data.get('faces', [[]])[0] if actual_pose_data.get('faces') else [] |
|
|
} |
|
|
|
|
|
|
|
|
if 'bodies' in actual_pose_data and 'candidate' in actual_pose_data['bodies']: |
|
|
candidates = actual_pose_data['bodies']['candidate'] |
|
|
for candidate in candidates: |
|
|
if candidate and len(candidate) >= 2: |
|
|
person_data["pose_keypoints_2d"].extend([candidate[0], candidate[1], candidate[2] if len(candidate) > 2 else 1.0]) |
|
|
|
|
|
_current_poses = [{ |
|
|
'people': [person_data], |
|
|
'metadata': { |
|
|
'resolution': actual_pose_data.get('resolution', [512, 512]) |
|
|
} |
|
|
}] |
|
|
_current_frame_index = 0 |
|
|
|
|
|
people_format_data = { |
|
|
'people': [person_data], |
|
|
'bodies': actual_pose_data.get('bodies', {}), |
|
|
'hands': actual_pose_data.get('hands', []), |
|
|
'faces': actual_pose_data.get('faces', []), |
|
|
'metadata': { |
|
|
'resolution': actual_pose_data.get('resolution', [512, 512]) |
|
|
}, |
|
|
'is_template_load': True |
|
|
} |
|
|
|
|
|
|
|
|
target_w, target_h = 512, 512 |
|
|
|
|
|
try: |
|
|
if _current_poses and 0 <= _current_frame_index < len(_current_poses): |
|
|
_current_poses[_current_frame_index]['metadata']['resolution'] = [target_w, target_h] |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
js_code = f"setTimeout(() => updateCanvasResolution({target_w}, {target_h}), 100);" |
|
|
|
|
|
notify_success(f"{template_name}を読み込みました") |
|
|
return ( |
|
|
people_format_data, |
|
|
people_format_data, |
|
|
gr.update(value=f"<script>{js_code}</script>"), |
|
|
gr.update(value=target_w), |
|
|
gr.update(value=target_h) |
|
|
) |
|
|
else: |
|
|
notify_error("テンプレートが見つかりません") |
|
|
return None, {}, gr.update(), gr.update(), gr.update() |
|
|
|
|
|
except Exception as e: |
|
|
notify_error(f"テンプレート読み込みに失敗しました: {str(e)}") |
|
|
return None, {}, gr.update(), gr.update(), gr.update() |
|
|
|
|
|
def export_image(pose_data, draw_hand, draw_face, width, height): |
|
|
"""ポーズ画像をエクスポート(Button + File方式)(refs互換・マルチフレーム管理)""" |
|
|
global _current_poses, _current_frame_index |
|
|
|
|
|
|
|
|
export_data = None |
|
|
if _current_poses and 0 <= _current_frame_index < len(_current_poses): |
|
|
current_frame = _current_poses[_current_frame_index] |
|
|
if current_frame['people'] and current_frame['people'][0]: |
|
|
person = current_frame['people'][0] |
|
|
|
|
|
|
|
|
export_data = { |
|
|
'people': [person], |
|
|
'resolution': current_frame['metadata'].get('resolution', [512, 512]) |
|
|
} |
|
|
|
|
|
|
|
|
if export_data is None: |
|
|
export_data = pose_data |
|
|
|
|
|
print(f"[DEBUG] 🖼️ 画像ダウンロードボタンクリック") |
|
|
print(f"[DEBUG] 📊 グローバルデータ: {bool(_current_poses)}, 引数データ: {bool(pose_data)}") |
|
|
print(f"[DEBUG] 📊 使用データ: {bool(export_data)}, 手: {draw_hand}, 顔: {draw_face}") |
|
|
|
|
|
if export_data: |
|
|
print(f"[DEBUG] 📊 エクスポートデータキー: {list(export_data.keys()) if isinstance(export_data, dict) else 'not a dict'}") |
|
|
if isinstance(export_data, dict): |
|
|
for key, value in export_data.items(): |
|
|
print(f"[DEBUG] 📊 {key}: {type(value)} - {len(value) if isinstance(value, (list, dict)) else value}") |
|
|
|
|
|
|
|
|
if _current_poses: |
|
|
print(f"[DEBUG] 🔍 グローバルデータ詳細:") |
|
|
print(f"[DEBUG] 🔍 - Type: {type(_current_poses)}") |
|
|
print(f"[DEBUG] 🔍 - Frames: {len(_current_poses) if _current_poses else 0}") |
|
|
if _current_poses and 0 <= _current_frame_index < len(_current_poses): |
|
|
current_frame = _current_poses[_current_frame_index] |
|
|
print(f"[DEBUG] 🔍 - Current frame: {bool(current_frame['people']) if 'people' in current_frame else False}") |
|
|
if current_frame.get('people'): |
|
|
person = current_frame['people'][0] |
|
|
print(f"[DEBUG] 🔍 - Person keys: {list(person.keys()) if person else 'None'}") |
|
|
else: |
|
|
print(f"[DEBUG] 🔍 グローバルデータがNone") |
|
|
|
|
|
if not export_data: |
|
|
notify_error("エクスポートするポーズデータがありません") |
|
|
print(f"[DEBUG] ❌ データなし") |
|
|
return gr.update(visible=False) |
|
|
|
|
|
try: |
|
|
|
|
|
canvas_width = int(width) if width else 512 |
|
|
canvas_height = int(height) if height else 512 |
|
|
|
|
|
|
|
|
canvas_width = max(64, min(2048, canvas_width)) |
|
|
canvas_height = max(64, min(2048, canvas_height)) |
|
|
|
|
|
|
|
|
print(f"[DEBUG] 🎨 画像生成開始: canvas_size=({canvas_width}, {canvas_height})") |
|
|
if 'bodies' in export_data and 'candidate' in export_data['bodies']: |
|
|
candidates = export_data['bodies']['candidate'] |
|
|
print(f"[DEBUG] 🎨 Candidates for image: {len(candidates) if candidates else 0}") |
|
|
if candidates: |
|
|
valid_candidates = [c for c in candidates if c and len(c) >= 2 and c[0] > 0 and c[1] > 0] |
|
|
print(f"[DEBUG] 🎨 Valid candidates: {len(valid_candidates)}") |
|
|
|
|
|
image = export_pose_as_image( |
|
|
export_data, |
|
|
canvas_size=(canvas_width, canvas_height), |
|
|
enable_hands=draw_hand, |
|
|
enable_face=draw_face |
|
|
) |
|
|
if image is None: |
|
|
print(f"[DEBUG] ❌ 画像生成失敗 - export_pose_as_image returned None") |
|
|
return gr.update(visible=False) |
|
|
|
|
|
|
|
|
filename = get_timestamp_filename("dwpose_edit", "png") |
|
|
temp_path = os.path.join(tempfile.gettempdir(), filename) |
|
|
image.save(temp_path) |
|
|
|
|
|
print(f"[DEBUG] ✅ ファイル準備完了: {temp_path}") |
|
|
notify_success(f"画像をエクスポートしました: {filename}") |
|
|
|
|
|
|
|
|
return gr.update(value=temp_path, visible=True) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DEBUG] ❌ エラー: {e}") |
|
|
notify_error(f"画像エクスポートエラー: {str(e)}") |
|
|
return gr.update(visible=False) |
|
|
|
|
|
def export_json(pose_data): |
|
|
"""ポーズJSONをエクスポート(people形式・refs互換・グローバル管理)""" |
|
|
global _current_poses, _current_frame_index |
|
|
|
|
|
|
|
|
export_data = None |
|
|
if _current_poses and 0 <= _current_frame_index < len(_current_poses): |
|
|
current_frame = _current_poses[_current_frame_index] |
|
|
if current_frame['people'] and current_frame['people'][0]: |
|
|
|
|
|
export_data = { |
|
|
'people': current_frame['people'], |
|
|
'resolution': current_frame['metadata'].get('resolution', [512, 512]) |
|
|
} |
|
|
|
|
|
|
|
|
if export_data is None: |
|
|
export_data = pose_data |
|
|
|
|
|
print(f"[DEBUG] 📥 JSONダウンロードボタンクリック(people形式)") |
|
|
print(f"[DEBUG] 📊 グローバルデータ: {bool(_current_poses)}, 引数データ: {bool(pose_data)}") |
|
|
print(f"[DEBUG] 📊 使用データ: {bool(export_data)}") |
|
|
|
|
|
if not export_data: |
|
|
notify_error("エクスポートするポーズデータがありません") |
|
|
print(f"[DEBUG] ❌ データなし") |
|
|
return gr.update(visible=False) |
|
|
|
|
|
try: |
|
|
|
|
|
json_str = export_pose_as_json(export_data) |
|
|
if not json_str: |
|
|
print(f"[DEBUG] ❌ JSON生成失敗") |
|
|
return gr.update(visible=False) |
|
|
|
|
|
|
|
|
filename = get_timestamp_filename("dwpose_data", "json") |
|
|
temp_path = os.path.join(tempfile.gettempdir(), filename) |
|
|
|
|
|
with open(temp_path, 'w', encoding='utf-8') as f: |
|
|
f.write(json_str) |
|
|
|
|
|
print(f"[DEBUG] ✅ ファイル準備完了: {temp_path}") |
|
|
notify_success(f"people形式JSONをエクスポートしました: {filename}") |
|
|
|
|
|
|
|
|
return gr.update(value=temp_path, visible=True) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DEBUG] ❌ エラー: {e}") |
|
|
notify_error(f"JSONエクスポートエラー: {str(e)}") |
|
|
return gr.update(visible=False) |
|
|
|
|
|
def on_js_pose_update(js_pose_str): |
|
|
"""JavaScript側からのポーズデータ更新(refs互換・マルチフレーム管理)""" |
|
|
global _current_poses, _current_frame_index, _is_updating |
|
|
|
|
|
update_timestamp = int(time.time() * 1000) |
|
|
print(f"[DEBUG] 🔄 JavaScript→Python データ転送開始: {len(js_pose_str) if js_pose_str else 0}文字, timestamp={update_timestamp}") |
|
|
print(f"[DEBUG] 🔍 更新フラグ状態: _is_updating={_is_updating}") |
|
|
print(f"[DEBUG] 🔍 グローバル状態: _current_poses={bool(_current_poses)}, _current_frame_index={_current_frame_index}") |
|
|
|
|
|
|
|
|
if js_pose_str: |
|
|
try: |
|
|
data_preview = json.loads(js_pose_str) |
|
|
print(f"[DEBUG] 🔍 受信データ構造:", { |
|
|
'hasPeople': 'people' in data_preview, |
|
|
'peopleCount': len(data_preview.get('people', [])), |
|
|
'hasHandLeft': bool(data_preview.get('people', [{}])[0].get('hand_left_keypoints_2d')) if data_preview.get('people') else False, |
|
|
'hasHandRight': bool(data_preview.get('people', [{}])[0].get('hand_right_keypoints_2d')) if data_preview.get('people') else False, |
|
|
'hasFace': bool(data_preview.get('people', [{}])[0].get('face_keypoints_2d')) if data_preview.get('people') else False |
|
|
}) |
|
|
except: |
|
|
print(f"[DEBUG] 🔍 受信データパース失敗") |
|
|
|
|
|
|
|
|
if _is_updating: |
|
|
print(f"[DEBUG] ⚠️ データ更新処理中のため、新しい要求をスキップ (timestamp={update_timestamp})") |
|
|
return gr.update(), "" |
|
|
|
|
|
if not js_pose_str or js_pose_str.strip() == "": |
|
|
return gr.update(), "" |
|
|
|
|
|
|
|
|
_is_updating = True |
|
|
print(f"[DEBUG] 🔒 データ更新フラグ設定: _is_updating={_is_updating} (timestamp={update_timestamp})") |
|
|
|
|
|
try: |
|
|
|
|
|
canvas_data = json.loads(js_pose_str) |
|
|
print(f"[DEBUG] 🎨 Canvas JSON解析成功: keys={list(canvas_data.keys())}") |
|
|
|
|
|
|
|
|
if '_t' in canvas_data: |
|
|
del canvas_data['_t'] |
|
|
print(f"[DEBUG] 🔄 タイムスタンプ除去完了") |
|
|
|
|
|
|
|
|
if 'people' in canvas_data and canvas_data['people']: |
|
|
pose_data = canvas_data['people'][0] |
|
|
print(f"[DEBUG] 🎯 People形式データ抽出成功") |
|
|
|
|
|
|
|
|
if _current_poses is None: |
|
|
_current_poses = [{ |
|
|
'people': [pose_data], |
|
|
'metadata': { |
|
|
'resolution': canvas_data.get('resolution', [512, 512]) |
|
|
} |
|
|
}] |
|
|
print(f"[DEBUG] 🚀 _current_poses初期化完了") |
|
|
else: |
|
|
|
|
|
if 0 <= _current_frame_index < len(_current_poses): |
|
|
_current_poses[_current_frame_index]['people'] = [pose_data] |
|
|
|
|
|
try: |
|
|
if 'resolution' in canvas_data and isinstance(canvas_data['resolution'], list): |
|
|
_current_poses[_current_frame_index]['metadata']['resolution'] = canvas_data['resolution'] |
|
|
except Exception: |
|
|
pass |
|
|
print(f"[DEBUG] 🎯 フレーム{_current_frame_index}のpeopleデータ更新完了") |
|
|
else: |
|
|
|
|
|
_current_poses.append({ |
|
|
'people': [pose_data], |
|
|
'metadata': { |
|
|
'resolution': canvas_data.get('resolution', [512, 512]) |
|
|
} |
|
|
}) |
|
|
print(f"[DEBUG] 🎯 新フレーム追加完了") |
|
|
|
|
|
|
|
|
current_frame_data = _current_poses[_current_frame_index] |
|
|
display_data = { |
|
|
'bodies': {'candidate': [], 'subset': []}, |
|
|
'hands': [], |
|
|
'faces': [], |
|
|
'resolution': current_frame_data['metadata'].get('resolution', [512, 512]) |
|
|
} |
|
|
|
|
|
|
|
|
if current_frame_data['people'] and current_frame_data['people'][0]: |
|
|
person = current_frame_data['people'][0] |
|
|
|
|
|
|
|
|
if 'pose_keypoints_2d' in person: |
|
|
pose_keypoints = person['pose_keypoints_2d'] |
|
|
print(f"[DEBUG] 🔄 pose_keypoints length: {len(pose_keypoints)}") |
|
|
|
|
|
if len(pose_keypoints) >= 3: |
|
|
for i in range(0, len(pose_keypoints), 3): |
|
|
if i + 2 < len(pose_keypoints): |
|
|
x = pose_keypoints[i] |
|
|
y = pose_keypoints[i + 1] |
|
|
conf = pose_keypoints[i + 2] |
|
|
display_data['bodies']['candidate'].append([x, y, conf, 0]) |
|
|
|
|
|
|
|
|
left_hand = person.get('hand_left_keypoints_2d', []) |
|
|
right_hand = person.get('hand_right_keypoints_2d', []) |
|
|
face_data = person.get('face_keypoints_2d', []) |
|
|
|
|
|
display_data['hands'] = [left_hand, right_hand] |
|
|
display_data['faces'] = [face_data] if face_data else [] |
|
|
|
|
|
print(f"[DEBUG] 🫳 Hand data: left={len(left_hand)}, right={len(right_hand)}") |
|
|
print(f"[DEBUG] 😊 Face data: {len(face_data) if face_data else 0}") |
|
|
|
|
|
print(f"[DEBUG] ✅ refs互換データ更新完了: frames={len(_current_poses)}") |
|
|
print(f"[DEBUG] 📊 表示用データ: candidates={len(display_data['bodies']['candidate'])}") |
|
|
|
|
|
|
|
|
return display_data, "" |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
print(f"[DEBUG] ❌ JavaScript→Python JSONパースエラー: {e}") |
|
|
return gr.update(), "" |
|
|
except Exception as e: |
|
|
print(f"[DEBUG] ❌ JavaScript→Python エラー: {e}") |
|
|
return gr.update(), "" |
|
|
finally: |
|
|
|
|
|
old_flag = _is_updating |
|
|
_is_updating = False |
|
|
print(f"[DEBUG] 🔓 データ更新処理完了 - フラグ解除: {old_flag} → {_is_updating} (timestamp={update_timestamp})") |
|
|
|
|
|
def on_json_upload(file): |
|
|
"""JSONファイルアップロード時の処理(people形式対応・refs互換)""" |
|
|
global _current_poses, _current_frame_index |
|
|
|
|
|
if file is None: |
|
|
return None, {}, gr.update(), gr.update(), gr.update() |
|
|
|
|
|
try: |
|
|
|
|
|
with open(file.name, 'r', encoding='utf-8') as f: |
|
|
json_content = f.read() |
|
|
|
|
|
|
|
|
loaded_data = json.loads(json_content) |
|
|
|
|
|
|
|
|
if isinstance(loaded_data, list) and len(loaded_data) > 0: |
|
|
|
|
|
frame_data = loaded_data[0] |
|
|
|
|
|
if 'people' in frame_data and frame_data['people']: |
|
|
person_data = frame_data['people'][0] |
|
|
canvas_width = frame_data.get('canvas_width', 512) |
|
|
canvas_height = frame_data.get('canvas_height', 512) |
|
|
|
|
|
|
|
|
_current_poses = [{ |
|
|
'people': [person_data], |
|
|
'metadata': { |
|
|
'resolution': [canvas_width, canvas_height] |
|
|
} |
|
|
}] |
|
|
_current_frame_index = 0 |
|
|
|
|
|
|
|
|
bodies_data = convert_people_to_bodies_format(person_data, [canvas_width, canvas_height]) |
|
|
display_data = { |
|
|
'people': [person_data], |
|
|
'bodies': bodies_data['bodies'], |
|
|
'hands': bodies_data['hands'], |
|
|
'faces': bodies_data['faces'], |
|
|
'resolution': [canvas_width, canvas_height], |
|
|
'metadata': {'resolution': [canvas_width, canvas_height]} |
|
|
} |
|
|
|
|
|
print(f"[DEBUG] ✅ people形式JSON読み込み完了(ハイブリッド形式)") |
|
|
notify_success("people形式JSONファイルを読み込みました") |
|
|
|
|
|
js_code = f"setTimeout(() => updateCanvasResolution({int(canvas_width)}, {int(canvas_height)}), 100);" |
|
|
return ( |
|
|
display_data, |
|
|
display_data, |
|
|
gr.update(value=f"<script>{js_code}</script>"), |
|
|
gr.update(value=int(canvas_width)), |
|
|
gr.update(value=int(canvas_height)) |
|
|
) |
|
|
else: |
|
|
notify_error("無効なpeople形式データです") |
|
|
return None, {}, gr.update(), gr.update(), gr.update() |
|
|
|
|
|
else: |
|
|
|
|
|
if not validate_pose_json(loaded_data): |
|
|
notify_error("無効なポーズデータフォーマットです") |
|
|
return None, {} |
|
|
|
|
|
|
|
|
if 'metadata' in loaded_data: |
|
|
del loaded_data['metadata'] |
|
|
|
|
|
|
|
|
person_data = { |
|
|
"pose_keypoints_2d": [], |
|
|
"hand_left_keypoints_2d": loaded_data.get('hands', [[], []])[0] if loaded_data.get('hands') else [], |
|
|
"hand_right_keypoints_2d": loaded_data.get('hands', [[], []])[1] if loaded_data.get('hands') and len(loaded_data['hands']) > 1 else [], |
|
|
"face_keypoints_2d": loaded_data.get('faces', [[]])[0] if loaded_data.get('faces') else [] |
|
|
} |
|
|
|
|
|
|
|
|
if 'bodies' in loaded_data and 'candidate' in loaded_data['bodies']: |
|
|
candidates = loaded_data['bodies']['candidate'] |
|
|
for candidate in candidates: |
|
|
if candidate and len(candidate) >= 2: |
|
|
person_data["pose_keypoints_2d"].extend([candidate[0], candidate[1], candidate[2] if len(candidate) > 2 else 1.0]) |
|
|
|
|
|
_current_poses = [{ |
|
|
'people': [person_data], |
|
|
'metadata': { |
|
|
'resolution': loaded_data.get('resolution', [512, 512]) |
|
|
} |
|
|
}] |
|
|
_current_frame_index = 0 |
|
|
|
|
|
|
|
|
hybrid_data = loaded_data.copy() |
|
|
hybrid_data['people'] = [person_data] |
|
|
|
|
|
print(f"[DEBUG] ✅ bodies形式JSON読み込み・変換完了(ハイブリッド形式)") |
|
|
notify_success("bodies形式JSONファイルを読み込みました(people形式に変換)") |
|
|
|
|
|
res = loaded_data.get('resolution', [512, 512]) |
|
|
try: |
|
|
w = int(res[0]); h = int(res[1]) |
|
|
except Exception: |
|
|
w, h = 512, 512 |
|
|
js_code = f"setTimeout(() => updateCanvasResolution({w}, {h}), 100);" |
|
|
return ( |
|
|
hybrid_data, |
|
|
hybrid_data, |
|
|
gr.update(value=f"<script>{js_code}</script>"), |
|
|
gr.update(value=w), |
|
|
gr.update(value=h) |
|
|
) |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
notify_error(f"JSONパースエラー: {str(e)}") |
|
|
return None, {}, gr.update(), gr.update(), gr.update() |
|
|
except Exception as e: |
|
|
notify_error(f"ファイル読み込みエラー: {str(e)}") |
|
|
return None, {}, gr.update(), gr.update(), gr.update() |
|
|
|
|
|
def convert_people_to_bodies_format(person_data, resolution): |
|
|
"""people形式からbodies形式に変換(表示互換性用)""" |
|
|
bodies_data = { |
|
|
'bodies': {'candidate': [], 'subset': []}, |
|
|
'hands': [], |
|
|
'faces': [], |
|
|
'resolution': resolution |
|
|
} |
|
|
|
|
|
|
|
|
if 'pose_keypoints_2d' in person_data: |
|
|
pose_keypoints = person_data['pose_keypoints_2d'] |
|
|
for i in range(0, len(pose_keypoints), 3): |
|
|
if i + 2 < len(pose_keypoints): |
|
|
x = pose_keypoints[i] |
|
|
y = pose_keypoints[i + 1] |
|
|
conf = pose_keypoints[i + 2] |
|
|
bodies_data['bodies']['candidate'].append([x, y, conf, 0]) |
|
|
|
|
|
|
|
|
left_hand = person_data.get('hand_left_keypoints_2d', []) |
|
|
right_hand = person_data.get('hand_right_keypoints_2d', []) |
|
|
face_data = person_data.get('face_keypoints_2d', []) |
|
|
|
|
|
bodies_data['hands'] = [left_hand, right_hand] |
|
|
bodies_data['faces'] = [face_data] if face_data else [] |
|
|
|
|
|
return bodies_data |
|
|
|
|
|
def validate_pose_json(data): |
|
|
"""ポーズJSONデータの検証""" |
|
|
if not isinstance(data, dict): |
|
|
return False |
|
|
|
|
|
|
|
|
required_keys = ['bodies'] |
|
|
for key in required_keys: |
|
|
if key not in data: |
|
|
return False |
|
|
|
|
|
|
|
|
if 'bodies' in data: |
|
|
bodies = data['bodies'] |
|
|
if not isinstance(bodies, dict): |
|
|
return False |
|
|
if 'candidate' not in bodies and 'subset' not in bodies: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
js_executor = gr.HTML(visible=False, elem_id="js_executor") |
|
|
|
|
|
|
|
|
input_image.change( |
|
|
fn=on_image_upload, |
|
|
inputs=[input_image], |
|
|
outputs=[output_json, pose_data, js_executor, canvas_width, canvas_height] |
|
|
) |
|
|
|
|
|
|
|
|
pose_data.change( |
|
|
fn=None, |
|
|
inputs=pose_data, |
|
|
outputs=[], |
|
|
js="(pose_data) => { if (window.gradioCanvasUpdate) { window.gradioCanvasUpdate(JSON.stringify(pose_data)); } }" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
draw_hand.change( |
|
|
fn=on_display_settings_change, |
|
|
inputs=[draw_hand, draw_face, edit_mode], |
|
|
outputs=[js_executor] |
|
|
) |
|
|
|
|
|
draw_face.change( |
|
|
fn=on_display_settings_change, |
|
|
inputs=[draw_hand, draw_face, edit_mode], |
|
|
outputs=[js_executor] |
|
|
) |
|
|
|
|
|
edit_mode.change( |
|
|
fn=on_display_settings_change, |
|
|
inputs=[draw_hand, draw_face, edit_mode], |
|
|
outputs=[js_executor] |
|
|
) |
|
|
|
|
|
|
|
|
template_update_btn.click( |
|
|
fn=load_template_pose, |
|
|
inputs=[template_dropdown], |
|
|
outputs=[output_json, pose_data, js_executor, canvas_width, canvas_height] |
|
|
) |
|
|
|
|
|
|
|
|
download_image_btn.click( |
|
|
fn=export_image, |
|
|
inputs=[pose_data, draw_hand, draw_face, canvas_width, canvas_height], |
|
|
outputs=[download_image_file] |
|
|
) |
|
|
|
|
|
download_json_btn.click( |
|
|
fn=export_json, |
|
|
inputs=[pose_data], |
|
|
outputs=[download_json_file] |
|
|
) |
|
|
|
|
|
|
|
|
js_pose_update.change( |
|
|
fn=on_js_pose_update, |
|
|
inputs=[js_pose_update], |
|
|
outputs=[pose_data, js_pose_update] |
|
|
) |
|
|
|
|
|
|
|
|
json_upload.change( |
|
|
fn=on_json_upload, |
|
|
inputs=[json_upload], |
|
|
outputs=[output_json, pose_data, js_executor, canvas_width, canvas_height] |
|
|
) |
|
|
|
|
|
|
|
|
canvas_update_btn.click( |
|
|
fn=on_canvas_size_update, |
|
|
inputs=[canvas_width, canvas_height], |
|
|
outputs=[js_executor] |
|
|
) |
|
|
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse, sys |
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument("--setup-models", action="store_true", help="Download required models into ./models and exit") |
|
|
args, _ = parser.parse_known_args() |
|
|
|
|
|
if args.setup_models: |
|
|
code = _download_models_setup() |
|
|
|
|
|
sys.exit(code) |
|
|
else: |
|
|
demo = main() |
|
|
demo.launch() |
|
|
|