Spaces:
Sleeping
Sleeping
| # app.py - الكود النهائي المعدل ليتوافق مع بنية measurements.json الحقيقية | |
| import os | |
| import json | |
| import gradio as gr | |
| import base64 | |
| from PIL import Image | |
| import io | |
| import requests | |
| # ============================================================================== | |
| # 1. إعدادات الـ API | |
| # ============================================================================== | |
| API_URL_BASE = "https://BaseerAI-baseer-server.hf.space" | |
| API_START_SESSION_URL = f"{API_URL_BASE}/start_session" | |
| API_RUN_STEP_URL = f"{API_URL_BASE}/run_step" | |
| API_END_SESSION_URL = f"{API_URL_BASE}/end_session" | |
| EXAMPLES_DIR = "examples" | |
| # ============================================================================== | |
| # 2. الدوال المساعدة والدوال التي تتصل بالـ API | |
| # ============================================================================== | |
| def image_to_base64(pil_image): | |
| buffered = io.BytesIO() | |
| pil_image.save(buffered, format="JPEG") | |
| return base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| def base64_to_image(b64_string): | |
| img_data = base64.b64decode(b64_string) | |
| return Image.open(io.BytesIO(img_data)) | |
| def start_new_session(): | |
| """يبدأ جلسة جديدة على الخادم ويعيد معرف الجلسة وأمر إظهار الواجهة.""" | |
| print("Requesting to start a new session...") | |
| try: | |
| response = requests.post(API_START_SESSION_URL, timeout=30) | |
| response.raise_for_status() | |
| session_data = response.json() | |
| session_id = session_data.get("session_id") | |
| if not session_id: | |
| raise gr.Error("لم يتمكن الخادم من بدء جلسة جديدة.") | |
| status_message = f"✅ تم بدء جلسة جديدة بنجاح. أنت الآن متصل بالخادم." | |
| print(f"Session started successfully: {session_id}") | |
| return session_id, status_message, gr.update(visible=True) | |
| except requests.exceptions.RequestException as e: | |
| error_message = f"فشل الاتصال بخادم الـ API: {e}" | |
| print(error_message) | |
| raise gr.Error(error_message) | |
| def handle_end_session(session_id): | |
| """ينهي الجلسة الحالية على الخادم.""" | |
| if not session_id: | |
| return "لا توجد جلسة نشطة لإنهائها.", None, gr.update(visible=False) | |
| print(f"Requesting to end session: {session_id}") | |
| try: | |
| response = requests.post(f"{API_END_SESSION_URL}?session_id={session_id}", timeout=30) | |
| response.raise_for_status() | |
| status_message = f"🔌 تم إنهاء الجلسة بنجاح. انقطع الاتصال بالخادم." | |
| print(status_message) | |
| return status_message, None, gr.update(visible=False) | |
| except requests.exceptions.RequestException as e: | |
| error_message = f"حدث خطأ أثناء إنهاء الجلسة: {e}" | |
| print(error_message) | |
| raise gr.Error(error_message) | |
| # --- [تم تعديل هذه الدالة] --- | |
| def run_single_frame_via_api(session_id, rgb_image_path, measurements_path): | |
| if not session_id: | |
| raise gr.Error("لا توجد جلسة نشطة. يرجى بدء جلسة جديدة أولاً.") | |
| if not (rgb_image_path and measurements_path): | |
| raise gr.Error("الرجاء توفير الصورة الأمامية وملف القياسات على الأقل.") | |
| try: | |
| rgb_image_pil = Image.open(rgb_image_path).convert("RGB") | |
| with open(measurements_path, 'r') as f: | |
| measurements_dict = json.load(f) | |
| image_b64 = image_to_base64(rgb_image_pil) | |
| # --- [التعديل الرئيسي هنا] --- | |
| # بناء الحمولة (Payload) بناءً على بنية الملف الحقيقية | |
| payload = { | |
| "session_id": session_id, | |
| "image_b64": image_b64, | |
| "measurements": { | |
| "pos_global": measurements_dict.get('pos_global', [0.0, 0.0]), | |
| "theta": measurements_dict.get('theta', 0.0), | |
| "speed": measurements_dict.get('speed', 0.0), | |
| "target_point": measurements_dict.get('target_point', [0.0, 100.0]) | |
| } | |
| } | |
| print(f"Sending run_step request for session: {session_id}") | |
| response = requests.post(API_RUN_STEP_URL, json=payload, timeout=60) | |
| response.raise_for_status() | |
| api_result = response.json() | |
| dashboard_b64 = api_result.get("dashboard_b64") | |
| control_commands = api_result.get("control_commands", {}) | |
| if not dashboard_b64: | |
| raise gr.Error("لم يتم إرجاع صورة لوحة التحكم من الـ API.") | |
| output_image = base64_to_image(dashboard_b64) | |
| return output_image, control_commands | |
| except requests.exceptions.RequestException as e: | |
| raise gr.Error(f"حدث خطأ أثناء الاتصال بالـ API: {e}") | |
| except Exception as e: | |
| raise gr.Error(f"حدث خطأ غير متوقع: {e}") | |
| # ============================================================================== | |
| # 4. تعريف واجهة Gradio | |
| # ============================================================================== | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), css=".gradio-container {max-width: 100% !important;}") as demo: | |
| session_id_state = gr.State(value=None) | |
| gr.Markdown("# 🚗 واجهة التحكم لنظام Baseer للقيادة الذاتية") | |
| gr.Markdown("هذه الواجهة تعمل كـ **عميل (Client)** يتصل بـ **[Baseer Self-Driving API](https://huggingface.co/spaces/BaseerAI/baseer-server)** لمعالجة البيانات.") | |
| gr.Markdown("---") | |
| with gr.Group(): | |
| gr.Markdown("## ⚙️ التحكم بالجلسة") | |
| with gr.Row(): | |
| start_session_button = gr.Button("🟢 بدء جلسة جديدة", variant="primary") | |
| end_session_button = gr.Button("🔴 إنهاء الجلسة الحالية") | |
| status_textbox = gr.Textbox(label="حالة الجلسة", interactive=False, value="في انتظار بدء الجلسة...") | |
| gr.Markdown("") | |
| with gr.Group(visible=False) as main_controls_group: | |
| with gr.Row(): | |
| # -- العمود الأيسر: الإعدادات والمدخلات -- | |
| with gr.Column(scale=2): | |
| with gr.Group(): | |
| gr.Markdown("### 🗂️ ارفع ملفات السيناريو") | |
| api_rgb_image_path = gr.Image(type="filepath", label="صورة الكاميرا الأمامية (RGB)") | |
| api_measurements_path = gr.File(label="ملف القياسات (JSON)", type="filepath") | |
| # --- [تم حذف gr.JSON من هنا] --- | |
| api_run_button = gr.Button("🚀 أرسل البيانات للمعالجة", variant="primary") | |
| gr.Markdown("") | |
| # ... (داخل with gr.Group(visible=False) as main_controls_group:) | |
| with gr.Group(): | |
| gr.Markdown("### ✨ أمثلة جاهزة") | |
| # التحقق من وجود مجلد الأمثلة لتجنب الأخطاء | |
| if os.path.isdir(EXAMPLES_DIR) and \ | |
| os.path.exists(os.path.join(EXAMPLES_DIR, "sample1", "measurements.json")) and \ | |
| os.path.exists(os.path.join(EXAMPLES_DIR, "sample2", "measurements.json")): | |
| # قراءة بيانات المثال الأول | |
| with open(os.path.join(EXAMPLES_DIR, "sample1", "measurements.json"), 'r') as f: | |
| data1 = json.load(f) | |
| # قراءة بيانات المثال الثاني | |
| with open(os.path.join(EXAMPLES_DIR, "sample2", "measurements.json"), 'r') as f: | |
| data2 = json.load(f) | |
| # --- [تعديل] 1: إنشاء مكونات جديدة لكل البيانات المستخدمة --- | |
| # سيتم استخدام عناوينها كعناوين للأعمدة في الجدول | |
| with gr.Column(visible=False): | |
| example_speed = gr.Textbox(label="السرعة (m/s)") | |
| example_theta = gr.Textbox(label="الاتجاه (Theta)") | |
| example_pos = gr.Textbox(label="الموقع العام") | |
| example_target = gr.Textbox(label="النقطة المستهدفة") | |
| gr.Examples( | |
| examples=[ | |
| # --- [تعديل] 2: بناء البيانات للأعمدة الجديدة --- | |
| [ | |
| # المدخلات الأساسية | |
| os.path.join(EXAMPLES_DIR, "sample1", "rgb.jpg"), | |
| os.path.join(EXAMPLES_DIR, "sample1", "measurements.json"), | |
| # البيانات للعرض فقط | |
| f"{data1.get('speed', 0.0):.2f}", | |
| f"{data1.get('theta', 0.0):.2f}", | |
| str(data1.get('pos_global', 'N/A')), | |
| str(data1.get('target_point', 'N/A')) | |
| ], | |
| [ | |
| # المدخلات الأساسية | |
| os.path.join(EXAMPLES_DIR, "sample2", "rgb.jpg"), | |
| os.path.join(EXAMPLES_DIR, "sample2", "measurements.json"), | |
| # البيانات للعرض فقط | |
| f"{data2.get('speed', 0.0):.2f}", | |
| f"{data2.get('theta', 0.0):.2f}", | |
| str(data2.get('pos_global', 'N/A')), | |
| str(data2.get('target_point', 'N/A')) | |
| ] | |
| ], | |
| # --- [تعديل] 3: تحديث المدخلات لتشمل كل الأعمدة --- | |
| inputs=[ | |
| api_rgb_image_path, | |
| api_measurements_path, | |
| example_speed, | |
| example_theta, | |
| example_pos, | |
| example_target | |
| ], | |
| label="اختر سيناريو اختبار", | |
| ) | |
| else: | |
| gr.Markdown("لم يتم العثور على مجلد الأمثلة (`examples`) أو محتوياته.") | |
| # -- العمود الأيمن: المخرجات -- | |
| with gr.Column(scale=3): | |
| with gr.Group(): | |
| gr.Markdown("### 📊 النتائج من الخادم") | |
| api_output_image = gr.Image(label="لوحة التحكم المرئية (من الـ API)", type="pil", interactive=False, height=600) | |
| api_control_json = gr.JSON(label="أوامر التحكم (من الـ API)") | |
| # --- ربط منطق الواجهة --- | |
| start_session_button.click( | |
| fn=start_new_session, | |
| inputs=None, | |
| outputs=[session_id_state, status_textbox, main_controls_group] | |
| ) | |
| end_session_button.click( | |
| fn=handle_end_session, | |
| inputs=[session_id_state], | |
| outputs=[status_textbox, session_id_state, main_controls_group] | |
| ) | |
| # --- [تم تعديل هذا الجزء] --- | |
| api_run_button.click( | |
| fn=run_single_frame_via_api, | |
| inputs=[session_id_state, api_rgb_image_path, api_measurements_path], # تم حذف target_point_list | |
| outputs=[api_output_image, api_control_json], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch(debug=True) |