Spaces:
Running
Running
File size: 18,267 Bytes
255a798 3646ccf 255a798 f4d9469 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 | import gradio as gr
import time
import random
# Vehicle state for dynamic responses
class VehicleState:
def __init__(self):
self.engine_running = True
self.rpm = 800 # Idle RPM
self.speed = 0
self.coolant_temp = 90 # °C
self.throttle_pos = 0
self.maf_rate = 250 # 2.5 g/s idle (scaled by 100)
self.fuel_level = 75 # 75%
self.update_counter = 0
# ELM327 settings
self.echo_on = True
self.linefeed_on = True
self.headers_on = False
self.spaces_on = True
self.protocol = "AUTO"
self.voltage = 13.05
def update(self):
"""Update vehicle state with realistic variations"""
self.update_counter += 1
if not self.engine_running:
return
# RPM varies slightly at idle
self.rpm = 800 + ((self.update_counter * 7) % 100) - 50
# Coolant temp gradually increases when engine running
if self.coolant_temp < 90:
if self.update_counter % 10 == 0:
self.coolant_temp += 1
# Simulate some throttle/speed variation
if self.update_counter % 50 == 0:
self.throttle_pos = (self.update_counter // 5) % 30
self.speed = self.throttle_pos * 2
# MAF follows throttle roughly
self.maf_rate = 250 + (self.throttle_pos * 5)
# Global vehicle state
vehicle_state = VehicleState()
# Pool of possible DTCs for testing
# Format: (code_bytes, description)
DTC_POOL = [
("01 71", "P0171 - System Too Lean (Bank 1)"),
("03 00", "P0300 - Random/Multiple Cylinder Misfire Detected"),
("04 42", "P0442 - EVAP Emission Control System Leak Detected (small leak)"),
("01 31", "P0131 - O2 Sensor Circuit Low Voltage (Bank 1, Sensor 1)"),
("01 33", "P0133 - O2 Sensor Circuit Slow Response (Bank 1, Sensor 1)"),
("01 71", "P0171 - System Too Lean (Bank 1)"),
("01 72", "P0172 - System Too Rich (Bank 1)"),
("02 02", "P0202 - Injector Circuit Malfunction - Cylinder 2"),
("03 20", "P0320 - Ignition/Distributor Engine Speed Input Circuit Malfunction"),
("04 20", "P0420 - Catalyst System Efficiency Below Threshold (Bank 1)"),
("05 00", "P0500 - Vehicle Speed Sensor Malfunction"),
("07 05", "P0705 - Transmission Range Sensor Circuit Malfunction (PRNDL Input)"),
("13 00", "P1300 - Igniter Circuit Malfunction"),
("00 16", "P0016 - Crankshaft Position/Camshaft Position Correlation (Bank 1)"),
("01 28", "P0128 - Coolant Thermostat (Coolant Temp Below Thermostat Regulating Temp)"),
]
# Mode 01 PID table (static responses)
MODE01_PID_TABLE = {
"00": "41 00 BE 3F B8 13", # Supported PIDs 01-20
"01": "41 01 83 07 65 04", # Monitor status (MIL on, 3 DTCs)
"03": "41 03 02 00", # Fuel system status (closed loop)
"06": "41 06 80", # Short term fuel trim
"07": "41 07 80", # Long term fuel trim
"0A": "41 0A B4", # Fuel pressure (540 kPa)
"0B": "41 0B 63", # Intake manifold pressure (99 kPa)
"0E": "41 0E 7C", # Timing advance (14 degrees)
"0F": "41 0F 54", # Intake air temperature (44°C)
"13": "41 13 03", # O2 sensors present
"1C": "41 1C 01", # OBD standard (OBD-II California ARB)
"1F": "41 1F 00 8C", # Run time since engine start (140s)
"20": "41 20 80 01 80 01", # Supported PIDs 21-40
"21": "41 21 00 4B", # Distance with MIL on (75 km)
"33": "41 33 65", # Barometric pressure (101 kPa)
"40": "41 40 40 00 00 00", # Supported PIDs 41-60
"42": "41 42 32 E8", # Control module voltage (13.05V)
"51": "41 51 01", # Fuel Type (Gasoline)
}
# Dynamic PIDs (generated from vehicle state)
DYNAMIC_PIDS = ["04", "05", "0C", "0D", "10", "11", "2F"]
def normalize_command(cmd):
"""Normalize command string (remove spaces, convert to uppercase)"""
return ''.join(cmd.split()).upper()
def format_response(response, add_prompt=True):
"""Format response with ELM327 settings (echo, spaces, prompt)"""
result = ""
if vehicle_state.echo_on:
result += response + "\n"
if vehicle_state.linefeed_on:
result += "\n"
if add_prompt:
result += ">"
return result
def generate_rpm_response():
"""Generate dynamic RPM response (PID 0C)"""
rpm_value = vehicle_state.rpm
encoded = rpm_value * 4
return f"41 0C {(encoded >> 8) & 0xFF:02X} {encoded & 0xFF:02X}"
def generate_speed_response():
"""Generate dynamic speed response (PID 0D)"""
return f"41 0D {vehicle_state.speed:02X}"
def generate_coolant_temp_response():
"""Generate dynamic coolant temperature response (PID 05)"""
encoded = vehicle_state.coolant_temp + 40
return f"41 05 {encoded:02X}"
def generate_engine_load_response():
"""Generate dynamic engine load response (PID 04)"""
if vehicle_state.engine_running:
load = (vehicle_state.throttle_pos * 2 + vehicle_state.rpm // 100) // 3
load = min(load, 100)
encoded = (load * 255) // 100
else:
encoded = 0
return f"41 04 {encoded:02X}"
def generate_maf_response():
"""Generate dynamic MAF response (PID 10)"""
maf = vehicle_state.maf_rate
return f"41 10 {(maf >> 8) & 0xFF:02X} {maf & 0xFF:02X}"
def generate_throttle_response():
"""Generate dynamic throttle position response (PID 11)"""
encoded = (vehicle_state.throttle_pos * 255) // 100
return f"41 11 {encoded:02X}"
def generate_fuel_level_response():
"""Generate dynamic fuel level response (PID 2F)"""
encoded = (vehicle_state.fuel_level * 255) // 100
return f"41 2F {encoded:02X}"
def handle_mode01_pid(pid):
"""Handle Mode 01 PID requests"""
# Check if it's a dynamic PID
if pid in DYNAMIC_PIDS:
if pid == "0C":
return generate_rpm_response()
elif pid == "0D":
return generate_speed_response()
elif pid == "05":
return generate_coolant_temp_response()
elif pid == "04":
return generate_engine_load_response()
elif pid == "10":
return generate_maf_response()
elif pid == "11":
return generate_throttle_response()
elif pid == "2F":
return generate_fuel_level_response()
# Check static PID table
if pid in MODE01_PID_TABLE:
return MODE01_PID_TABLE[pid]
return "NO DATA"
def generate_vin_response():
"""Generate Mode 09 PID 02 response (VIN)"""
vin = "5TDKRKEC7PS142916"
response = "49 02 01"
for char in vin:
response += f" {ord(char):02X}"
return response
def generate_calibration_id_response():
"""Generate Mode 09 PID 04 response (Calibration ID)"""
cal_id = "CAL123456"
response = "49 04 01"
for char in cal_id:
response += f" {ord(char):02X}"
return response
def generate_ecu_name_response():
"""Generate Mode 09 PID 0A response (ECU Name)"""
ecu_name = "ECU_SIM_UNIT"
response = "49 0A 01"
for char in ecu_name:
response += f" {ord(char):02X}"
return response
def handle_at_command(cmd):
"""Handle AT commands"""
global vehicle_state
cmd_upper = cmd.upper()
# ATZ - Reset
if cmd_upper == "ATZ":
vehicle_state = VehicleState()
return "ELM327 v1.5\n\n>"
# AT@1 - Device description
if cmd_upper == "AT@1":
return "OBDSIM ELM327\n\n>"
# ATI - Version ID
if cmd_upper == "ATI":
return "ELM327 v1.5\n\n>"
# ATE0 - Echo off
if cmd_upper == "ATE0":
vehicle_state.echo_on = False
return "OK\n\n>"
# ATE1 - Echo on
if cmd_upper == "ATE1":
vehicle_state.echo_on = True
return "OK\n\n>"
# ATL0 - Linefeed off
if cmd_upper == "ATL0":
vehicle_state.linefeed_on = False
return "OK\n\n>"
# ATL1 - Linefeed on
if cmd_upper == "ATL1":
vehicle_state.linefeed_on = True
return "OK\n\n>"
# ATH0 - Headers off
if cmd_upper == "ATH0":
vehicle_state.headers_on = False
return "OK\n\n>"
# ATH1 - Headers on
if cmd_upper == "ATH1":
vehicle_state.headers_on = True
return "OK\n\n>"
# ATS0 - Spaces off
if cmd_upper == "ATS0":
vehicle_state.spaces_on = False
return "OK\n\n>"
# ATS1 - Spaces on
if cmd_upper == "ATS1":
vehicle_state.spaces_on = True
return "OK\n\n>"
# ATSP - Set Protocol
if cmd_upper.startswith("ATSP"):
protocol_num = cmd_upper[4:] if len(cmd_upper) > 4 else "0"
protocols = {
"0": "AUTO",
"1": "SAE J1850 PWM",
"2": "SAE J1850 VPW",
"3": "ISO 9141-2",
"4": "ISO 14230-4 KWP",
"5": "ISO 14230-4 KWP (fast)",
"6": "ISO 15765-4 CAN (11 bit, 500 kbaud)",
"7": "ISO 15765-4 CAN (29 bit, 500 kbaud)",
"8": "ISO 15765-4 CAN (11 bit, 250 kbaud)",
"9": "ISO 15765-4 CAN (29 bit, 250 kbaud)",
"A": "SAE J1939 CAN",
}
vehicle_state.protocol = protocols.get(protocol_num, "AUTO")
return "OK\n\n>"
# ATDP - Describe Protocol
if cmd_upper == "ATDP":
return f"{vehicle_state.protocol}\n\n>"
# ATDPN - Describe Protocol by Number
if cmd_upper == "ATDPN":
return "6\n\n>" # ISO 15765-4 CAN (11 bit, 500 kbaud)
# ATRV - Read Voltage
if cmd_upper == "ATRV":
return f"{vehicle_state.voltage:.1f}V\n\n>"
# ATWS - Warm Start
if cmd_upper == "ATWS":
return "ELM327 v1.5\n\n>"
# ATD - Set to Defaults
if cmd_upper == "ATD":
vehicle_state = VehicleState()
return "OK\n\n>"
# ATAT - Adaptive Timing
if cmd_upper.startswith("ATAT"):
return "OK\n\n>"
# ATST - Set Timeout
if cmd_upper.startswith("ATST"):
return "OK\n\n>"
# ATMA - Monitor All
if cmd_upper == "ATMA":
return "MONITORING...\n(Press any key to stop)\n\n>"
# ATPC - Protocol Close
if cmd_upper == "ATPC":
return "OK\n\n>"
# Default response for unknown AT commands
return "?\n\n>"
def handle_obd_command(normalized):
"""Handle OBD-II commands"""
# Update vehicle state
vehicle_state.update()
# Mode 03 - Read DTCs
if normalized == "03":
# Randomly select 1-5 DTCs from the pool
num_dtcs = random.randint(1, 5)
selected_dtcs = random.sample(DTC_POOL, min(num_dtcs, len(DTC_POOL)))
# Build response: 43 [DTC codes...]
response = "43"
for dtc_code, _ in selected_dtcs:
response += " " + dtc_code
return response + "\n\n>"
# Mode 07 - Read pending DTCs
if normalized == "07":
# Randomly select 0-2 pending DTCs from the pool
num_pending = random.randint(0, 2)
if num_pending == 0:
# No pending DTCs
response = "47"
else:
selected_pending = random.sample(DTC_POOL, min(num_pending, len(DTC_POOL)))
response = "47"
for dtc_code, _ in selected_pending:
response += " " + dtc_code
return response + "\n\n>"
# Mode 04 - Clear DTCs
if normalized == "04":
return "44\n\n>"
# Mode 09 - Vehicle Information
if len(normalized) >= 4 and normalized[:2] == "09":
mode09_pid = normalized[2:4]
if mode09_pid == "00":
# Supported PIDs
response = "49 00 54 40 00 00"
elif mode09_pid == "02":
# VIN
response = generate_vin_response()
elif mode09_pid == "04":
# Calibration ID
response = generate_calibration_id_response()
elif mode09_pid == "0A":
# ECU Name
response = generate_ecu_name_response()
else:
response = "NO DATA"
return response + "\n\n>"
# Mode 01 - Show current data
if len(normalized) >= 4 and normalized[:2] == "01":
pid = normalized[2:4]
response = handle_mode01_pid(pid)
return response + "\n\n>"
return "NO DATA\n\n>"
def send_elm327_command(command):
"""
Send a command to the ELM327 OBD-II adapter and get the response.
Args:
command (str): ELM327 command to send (e.g., 'ATZ', '01 0D')
Returns:
str: The response from the ELM327 adapter
"""
if not command or not command.strip():
return ">"
# Normalize command
normalized = normalize_command(command.strip())
# Handle AT commands
if normalized.startswith("AT"):
return handle_at_command(normalized)
# Handle OBD commands
return handle_obd_command(normalized)
def get_system_status():
"""
Get system status including IP address, network status, uptime, and memory.
Returns:
str: System status information
"""
import json
import socket
import psutil
import platform
# Get IP address
try:
# Get the default route interface IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip_address = s.getsockname()[0]
s.close()
except Exception:
ip_address = "127.0.0.1"
# Get uptime in seconds
uptime_seconds = int(time.time() - psutil.boot_time())
# Get free memory in bytes
memory_info = psutil.virtual_memory()
free_memory_bytes = memory_info.available
# Simulate WiFi RSSI (for simulation, use a random value between -40 and -70 dBm)
# In real hardware this would come from the WiFi driver
import random
wifi_rssi_dbm = random.randint(-70, -40)
# Build JSON response matching W600 MCP format
status_data = {
"ip_address": ip_address,
"uptime_seconds": uptime_seconds,
"free_memory_bytes": free_memory_bytes,
"wifi_rssi_dbm": wifi_rssi_dbm,
"elm327_status": "Simulated ELM327 on Gradio"
}
# Return formatted JSON string
return json.dumps(status_data, indent=2)
def get_elm327_history(count):
"""
Get historical log of OBD-II data (RPM, speed, coolant temp) with streaming support.
Returns the last N records.
Args:
count (int): Number of most recent records to retrieve (default: 20, max: 20)
Returns:
str: Historical OBD-II data or error message
"""
import json
if count is None or count == "":
count = 20
else:
count = int(count)
# Enforce maximum limit of 20 records
if count > 20:
return json.dumps({
"error": "Request exceeds maximum limit",
"message": f"Requested {count} records, but maximum allowed is 20 records. Please request 20 or fewer records.",
"requested": count,
"max_allowed": 20
}, indent=2)
# Generate simulated test data
# Simulate a scenario where the vehicle has been running for a while
history_records = []
# Base timestamp in seconds (simulating uptime)
base_time = 0 # Start from 0 seconds
for i in range(count):
# Simulate realistic driving conditions with more variations
import math
# RPM varies between idle (800) and higher driving (up to 4500)
# Create multiple variation patterns for more realistic data
cycle_position = (i % 50) / 50.0
sine_variation = math.sin(i * 0.1) * 400 # Sine wave variation
random_noise = ((i * 7) % 300) - 150 # Random-like noise
base_rpm = 800 + 2700 * cycle_position
rpm = int(base_rpm + sine_variation + random_noise)
rpm = max(700, min(rpm, 5000)) # Clamp to realistic range
# Speed follows RPM with more variation (0-120 km/h)
# Add acceleration/deceleration patterns
speed_factor = (rpm - 800) / 35 # Convert RPM to rough speed
speed_variation = math.cos(i * 0.15) * 15 # Speed variations
speed = int(speed_factor + speed_variation + ((i * 3) % 20) - 10)
speed = max(0, min(speed, 130)) # Clamp to 0-130 km/h
# Coolant temp starts cold and warms up, then stabilizes
if i < 20:
coolant_temp = 20 + i * 3 # Warming up
elif i < 40:
coolant_temp = 60 + (i - 20) * 1 # Still warming
else:
coolant_temp = 80 + ((i * 5) % 15) # Stable operating temp with variation
coolant_temp = min(coolant_temp, 95) # Cap at 95°C
# Create record matching W600 MCP format
# Time in seconds (2 second intervals)
record = {
"seq": i,
"time": base_time + (i * 2), # Time in seconds, 2 seconds between samples
"rpm": rpm,
"speed": speed,
"coolant_temp": coolant_temp
}
history_records.append(record)
# Format as JSON array (matching MCP format)
result = json.dumps(history_records)
return result
# Interface for ELM327 commands
elm327_interface = gr.Interface(
fn=send_elm327_command,
inputs=gr.Textbox(label="Command", placeholder="e.g., ATZ, 01 0D"),
outputs=gr.Textbox(label="Response"),
title="ELM327 Commands",
description="Send commands to the ELM327 OBD-II adapter"
)
# Interface for system status
status_interface = gr.Interface(
fn=get_system_status,
inputs=None,
outputs=gr.Textbox(label="Status"),
title="System Status",
description="Get system status including IP address, network status, uptime, and memory"
)
# Interface for ELM327 history
history_interface = gr.Interface(
fn=get_elm327_history,
inputs=gr.Number(label="Count", value=20, precision=0,
info="Number of most recent records to retrieve (default: 20, max: 20)"),
outputs=gr.Textbox(label="History Data"),
title="ELM327 History",
description="Get historical log of OBD-II data (RPM, speed, coolant temp) - Maximum 20 records"
)
# Combine all interfaces with tabs
demo = gr.TabbedInterface(
[elm327_interface, status_interface, history_interface],
["ELM327 Commands", "System Status", "History"]
)
if __name__ == "__main__":
demo.launch(mcp_server=True) |