|
|
|
|
|
|
|
|
import os |
|
|
import json |
|
|
import gradio as gr |
|
|
import base64 |
|
|
from PIL import Image |
|
|
import io |
|
|
import requests |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
API_URL_BASE = "https://BaseerAI-baseer-server.hf.space" |
|
|
API_START_SESSION_URL = f"{API_URL_BASE}/start_session" |
|
|
API_RUN_STEP_URL = f"{API_URL_BASE}/run_step" |
|
|
API_END_SESSION_URL = f"{API_URL_BASE}/end_session" |
|
|
|
|
|
EXAMPLES_DIR = "examples" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def image_to_base64(pil_image): |
|
|
buffered = io.BytesIO() |
|
|
pil_image.save(buffered, format="JPEG") |
|
|
return base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
|
|
|
def base64_to_image(b64_string): |
|
|
img_data = base64.b64decode(b64_string) |
|
|
return Image.open(io.BytesIO(img_data)) |
|
|
|
|
|
def start_new_session(): |
|
|
print("Requesting to start a new session...") |
|
|
try: |
|
|
response = requests.post(API_START_SESSION_URL, timeout=30) |
|
|
response.raise_for_status() |
|
|
session_data = response.json() |
|
|
session_id = session_data.get("session_id") |
|
|
if not session_id: |
|
|
raise gr.Error("لم يتمكن الخادم من بدء جلسة جديدة.") |
|
|
status_message = f"✅ تم بدء جلسة جديدة بنجاح. أنت الآن متصل بالخادم." |
|
|
print(f"Session started successfully: {session_id}") |
|
|
return session_id, status_message, gr.update(visible=True) |
|
|
except requests.exceptions.RequestException as e: |
|
|
error_message = f"فشل الاتصال بخادم الـ API: {e}" |
|
|
print(error_message) |
|
|
raise gr.Error(error_message) |
|
|
|
|
|
def handle_end_session(session_id): |
|
|
if not session_id: |
|
|
return "لا توجد جلسة نشطة لإنهائها.", None, gr.update(visible=False) |
|
|
print(f"Requesting to end session: {session_id}") |
|
|
try: |
|
|
response = requests.post(f"{API_END_SESSION_URL}?session_id={session_id}", timeout=30) |
|
|
response.raise_for_status() |
|
|
status_message = f"🔌 تم إنهاء الجلسة بنجاح. انقطع الاتصال بالخادم." |
|
|
print(status_message) |
|
|
return status_message, None, gr.update(visible=False) |
|
|
except requests.exceptions.RequestException as e: |
|
|
error_message = f"حدث خطأ أثناء إنهاء الجلسة: {e}" |
|
|
print(error_message) |
|
|
raise gr.Error(error_message) |
|
|
|
|
|
def run_single_frame_via_api(session_id, rgb_image_path, measurements_path): |
|
|
if not session_id: |
|
|
raise gr.Error("لا توجد جلسة نشطة. يرجى بدء جلسة جديدة أولاً.") |
|
|
if not (rgb_image_path and measurements_path): |
|
|
raise gr.Error("الرجاء توفير الصورة الأمامية وملف القياسات على الأقل.") |
|
|
try: |
|
|
rgb_image_pil = Image.open(rgb_image_path).convert("RGB") |
|
|
|
|
|
if not isinstance(measurements_path, str) or not os.path.exists(measurements_path): |
|
|
raise gr.Error(f"مسار ملف القياسات غير صالح أو مفقود: {measurements_path}") |
|
|
with open(measurements_path, 'r') as f: |
|
|
measurements_dict = json.load(f) |
|
|
image_b64 = image_to_base64(rgb_image_pil) |
|
|
payload = { |
|
|
"session_id": session_id, "image_b64": image_b64, |
|
|
"measurements": { |
|
|
"pos_global": measurements_dict.get('pos_global', [0.0, 0.0]), |
|
|
"theta": measurements_dict.get('theta', 0.0), |
|
|
"speed": measurements_dict.get('speed', 0.0), |
|
|
"target_point": measurements_dict.get('target_point', [0.0, 100.0]) |
|
|
} |
|
|
} |
|
|
print(f"Sending run_step request for session: {session_id}") |
|
|
response = requests.post(API_RUN_STEP_URL, json=payload, timeout=60) |
|
|
response.raise_for_status() |
|
|
api_result = response.json() |
|
|
dashboard_b64 = api_result.get("dashboard_b64") |
|
|
control_commands = api_result.get("control_commands", {}) |
|
|
if not dashboard_b64: |
|
|
raise gr.Error("لم يتم إرجاع صورة لوحة التحكم من الـ API.") |
|
|
output_image = base64_to_image(dashboard_b64) |
|
|
return output_image, control_commands |
|
|
except FileNotFoundError: |
|
|
raise gr.Error(f"لم يتم العثور على الملف: {measurements_path}") |
|
|
except requests.exceptions.RequestException as e: |
|
|
raise gr.Error(f"حدث خطأ أثناء الاتصال بالـ API: {e}") |
|
|
except Exception as e: |
|
|
raise gr.Error(f"حدث خطأ غير متوقع: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FULL_EXAMPLES_DATA = [] |
|
|
|
|
|
SAMPLES_FOR_DISPLAY = [] |
|
|
|
|
|
if os.path.isdir(EXAMPLES_DIR): |
|
|
try: |
|
|
for i in range(1, 5): |
|
|
sample_name = f"sample{i}" |
|
|
img_path = os.path.join(EXAMPLES_DIR, sample_name, "rgb.jpg") |
|
|
json_path = os.path.join(EXAMPLES_DIR, sample_name, "measurements.json") |
|
|
|
|
|
with open(json_path, 'r') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
FULL_EXAMPLES_DATA.append({ |
|
|
"img_path": img_path, |
|
|
"json_path": json_path |
|
|
}) |
|
|
|
|
|
|
|
|
SAMPLES_FOR_DISPLAY.append([ |
|
|
img_path, |
|
|
f"{data.get('speed', 0.0):.2f}", |
|
|
f"{data.get('theta', 0.0):.2f}", |
|
|
str(data.get('pos_global', 'N/A')), |
|
|
str(data.get('target_point', 'N/A')) |
|
|
]) |
|
|
except FileNotFoundError: |
|
|
print(f"تحذير: لم يتم العثور على بعض ملفات الأمثلة في مجلد '{EXAMPLES_DIR}'.") |
|
|
FULL_EXAMPLES_DATA = [] |
|
|
SAMPLES_FOR_DISPLAY = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CUSTOM_CSS = ".gradio-container {max-width: 100% !important;}" |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), css=CUSTOM_CSS) as demo: |
|
|
session_id_state = gr.State(value=None) |
|
|
|
|
|
gr.Markdown("# 🚗 Baseer Self-Driving System Dashboard") |
|
|
gr.Markdown("This interface acts as a **client** that connects to the **[Baseer Self-Driving API](https://huggingface.co/spaces/BaseerAI/Baseer_Server)** for data processing.") |
|
|
gr.Markdown("---") |
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("## ⚙️ Session Control") |
|
|
with gr.Row(): |
|
|
start_session_button = gr.Button("🟢 Start New Session", variant="primary") |
|
|
end_session_button = gr.Button("🔴 End Current Session") |
|
|
status_textbox = gr.Textbox(label="Session Status", interactive=False, value="Waiting to start session...") |
|
|
|
|
|
gr.Markdown("") |
|
|
|
|
|
with gr.Group(visible=False) as main_controls_group: |
|
|
with gr.Row(equal_height=False): |
|
|
with gr.Column(scale=2): |
|
|
with gr.Group(): |
|
|
gr.Markdown("### 🗂️ Upload Scenario Files") |
|
|
api_rgb_image_path = gr.Image(type="filepath", label="(RGB Image)") |
|
|
api_measurements_path = gr.File(label="(Measurements JSON)", type="filepath") |
|
|
api_run_button = gr.Button("🚀 Send Data for Processing", variant="primary") |
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### ✨ Preloaded Examples") |
|
|
if SAMPLES_FOR_DISPLAY: |
|
|
def on_example_select(evt: gr.SelectData): |
|
|
selected_example = FULL_EXAMPLES_DATA[evt.index] |
|
|
return gr.update(value=selected_example["img_path"]), gr.update(value=selected_example["json_path"]) |
|
|
|
|
|
with gr.Column(visible=False): |
|
|
example_image = gr.Image(label="Image", type="filepath") |
|
|
example_speed = gr.Textbox(label="Speed") |
|
|
example_theta = gr.Textbox(label="Direction (theta)") |
|
|
example_position = gr.Textbox(label="Position") |
|
|
example_target = gr.Textbox(label="Target Point") |
|
|
|
|
|
example_components = [example_image, example_speed, example_theta, example_position, example_target] |
|
|
|
|
|
if SAMPLES_FOR_DISPLAY: |
|
|
example_dataset = gr.Dataset( |
|
|
components=example_components, |
|
|
samples=SAMPLES_FOR_DISPLAY, |
|
|
label="Select Test Scenario" |
|
|
) |
|
|
|
|
|
example_dataset.select( |
|
|
fn=on_example_select, |
|
|
inputs=None, |
|
|
outputs=[api_rgb_image_path, api_measurements_path] |
|
|
) |
|
|
else: |
|
|
gr.Markdown("No valid examples found.") |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
with gr.Group(): |
|
|
gr.Markdown("### 📊 Results from API") |
|
|
api_output_image = gr.Image(label="Dashboard Image (from API)", type="pil", interactive=False, height=600) |
|
|
api_control_json = gr.JSON(label="Control Commands (from API)") |
|
|
|
|
|
|
|
|
start_session_button.click(fn=start_new_session, inputs=None, outputs=[session_id_state, status_textbox, main_controls_group]) |
|
|
end_session_button.click(fn=handle_end_session, inputs=[session_id_state], outputs=[status_textbox, session_id_state, main_controls_group]) |
|
|
api_run_button.click(fn=run_single_frame_via_api, inputs=[session_id_state, api_rgb_image_path, api_measurements_path], outputs=[api_output_image, api_control_json]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
demo.queue().launch(debug=True) |