import gradio as gr import time import random # Vehicle state for dynamic responses class VehicleState: def __init__(self): self.engine_running = True self.rpm = 800 # Idle RPM self.speed = 0 self.coolant_temp = 90 # °C self.throttle_pos = 0 self.maf_rate = 250 # 2.5 g/s idle (scaled by 100) self.fuel_level = 75 # 75% self.update_counter = 0 # ELM327 settings self.echo_on = True self.linefeed_on = True self.headers_on = False self.spaces_on = True self.protocol = "AUTO" self.voltage = 13.05 def update(self): """Update vehicle state with realistic variations""" self.update_counter += 1 if not self.engine_running: return # RPM varies slightly at idle self.rpm = 800 + ((self.update_counter * 7) % 100) - 50 # Coolant temp gradually increases when engine running if self.coolant_temp < 90: if self.update_counter % 10 == 0: self.coolant_temp += 1 # Simulate some throttle/speed variation if self.update_counter % 50 == 0: self.throttle_pos = (self.update_counter // 5) % 30 self.speed = self.throttle_pos * 2 # MAF follows throttle roughly self.maf_rate = 250 + (self.throttle_pos * 5) # Global vehicle state vehicle_state = VehicleState() # Pool of possible DTCs for testing # Format: (code_bytes, description) DTC_POOL = [ ("01 71", "P0171 - System Too Lean (Bank 1)"), ("03 00", "P0300 - Random/Multiple Cylinder Misfire Detected"), ("04 42", "P0442 - EVAP Emission Control System Leak Detected (small leak)"), ("01 31", "P0131 - O2 Sensor Circuit Low Voltage (Bank 1, Sensor 1)"), ("01 33", "P0133 - O2 Sensor Circuit Slow Response (Bank 1, Sensor 1)"), ("01 71", "P0171 - System Too Lean (Bank 1)"), ("01 72", "P0172 - System Too Rich (Bank 1)"), ("02 02", "P0202 - Injector Circuit Malfunction - Cylinder 2"), ("03 20", "P0320 - Ignition/Distributor Engine Speed Input Circuit Malfunction"), ("04 20", "P0420 - Catalyst System Efficiency Below Threshold (Bank 1)"), ("05 00", "P0500 - Vehicle Speed Sensor Malfunction"), ("07 05", "P0705 - Transmission Range Sensor Circuit Malfunction (PRNDL Input)"), ("13 00", "P1300 - Igniter Circuit Malfunction"), ("00 16", "P0016 - Crankshaft Position/Camshaft Position Correlation (Bank 1)"), ("01 28", "P0128 - Coolant Thermostat (Coolant Temp Below Thermostat Regulating Temp)"), ] # Mode 01 PID table (static responses) MODE01_PID_TABLE = { "00": "41 00 BE 3F B8 13", # Supported PIDs 01-20 "01": "41 01 83 07 65 04", # Monitor status (MIL on, 3 DTCs) "03": "41 03 02 00", # Fuel system status (closed loop) "06": "41 06 80", # Short term fuel trim "07": "41 07 80", # Long term fuel trim "0A": "41 0A B4", # Fuel pressure (540 kPa) "0B": "41 0B 63", # Intake manifold pressure (99 kPa) "0E": "41 0E 7C", # Timing advance (14 degrees) "0F": "41 0F 54", # Intake air temperature (44°C) "13": "41 13 03", # O2 sensors present "1C": "41 1C 01", # OBD standard (OBD-II California ARB) "1F": "41 1F 00 8C", # Run time since engine start (140s) "20": "41 20 80 01 80 01", # Supported PIDs 21-40 "21": "41 21 00 4B", # Distance with MIL on (75 km) "33": "41 33 65", # Barometric pressure (101 kPa) "40": "41 40 40 00 00 00", # Supported PIDs 41-60 "42": "41 42 32 E8", # Control module voltage (13.05V) "51": "41 51 01", # Fuel Type (Gasoline) } # Dynamic PIDs (generated from vehicle state) DYNAMIC_PIDS = ["04", "05", "0C", "0D", "10", "11", "2F"] def normalize_command(cmd): """Normalize command string (remove spaces, convert to uppercase)""" return ''.join(cmd.split()).upper() def format_response(response, add_prompt=True): """Format response with ELM327 settings (echo, spaces, prompt)""" result = "" if vehicle_state.echo_on: result += response + "\n" if vehicle_state.linefeed_on: result += "\n" if add_prompt: result += ">" return result def generate_rpm_response(): """Generate dynamic RPM response (PID 0C)""" rpm_value = vehicle_state.rpm encoded = rpm_value * 4 return f"41 0C {(encoded >> 8) & 0xFF:02X} {encoded & 0xFF:02X}" def generate_speed_response(): """Generate dynamic speed response (PID 0D)""" return f"41 0D {vehicle_state.speed:02X}" def generate_coolant_temp_response(): """Generate dynamic coolant temperature response (PID 05)""" encoded = vehicle_state.coolant_temp + 40 return f"41 05 {encoded:02X}" def generate_engine_load_response(): """Generate dynamic engine load response (PID 04)""" if vehicle_state.engine_running: load = (vehicle_state.throttle_pos * 2 + vehicle_state.rpm // 100) // 3 load = min(load, 100) encoded = (load * 255) // 100 else: encoded = 0 return f"41 04 {encoded:02X}" def generate_maf_response(): """Generate dynamic MAF response (PID 10)""" maf = vehicle_state.maf_rate return f"41 10 {(maf >> 8) & 0xFF:02X} {maf & 0xFF:02X}" def generate_throttle_response(): """Generate dynamic throttle position response (PID 11)""" encoded = (vehicle_state.throttle_pos * 255) // 100 return f"41 11 {encoded:02X}" def generate_fuel_level_response(): """Generate dynamic fuel level response (PID 2F)""" encoded = (vehicle_state.fuel_level * 255) // 100 return f"41 2F {encoded:02X}" def handle_mode01_pid(pid): """Handle Mode 01 PID requests""" # Check if it's a dynamic PID if pid in DYNAMIC_PIDS: if pid == "0C": return generate_rpm_response() elif pid == "0D": return generate_speed_response() elif pid == "05": return generate_coolant_temp_response() elif pid == "04": return generate_engine_load_response() elif pid == "10": return generate_maf_response() elif pid == "11": return generate_throttle_response() elif pid == "2F": return generate_fuel_level_response() # Check static PID table if pid in MODE01_PID_TABLE: return MODE01_PID_TABLE[pid] return "NO DATA" def generate_vin_response(): """Generate Mode 09 PID 02 response (VIN)""" vin = "5TDKRKEC7PS142916" response = "49 02 01" for char in vin: response += f" {ord(char):02X}" return response def generate_calibration_id_response(): """Generate Mode 09 PID 04 response (Calibration ID)""" cal_id = "CAL123456" response = "49 04 01" for char in cal_id: response += f" {ord(char):02X}" return response def generate_ecu_name_response(): """Generate Mode 09 PID 0A response (ECU Name)""" ecu_name = "ECU_SIM_UNIT" response = "49 0A 01" for char in ecu_name: response += f" {ord(char):02X}" return response def handle_at_command(cmd): """Handle AT commands""" global vehicle_state cmd_upper = cmd.upper() # ATZ - Reset if cmd_upper == "ATZ": vehicle_state = VehicleState() return "ELM327 v1.5\n\n>" # AT@1 - Device description if cmd_upper == "AT@1": return "OBDSIM ELM327\n\n>" # ATI - Version ID if cmd_upper == "ATI": return "ELM327 v1.5\n\n>" # ATE0 - Echo off if cmd_upper == "ATE0": vehicle_state.echo_on = False return "OK\n\n>" # ATE1 - Echo on if cmd_upper == "ATE1": vehicle_state.echo_on = True return "OK\n\n>" # ATL0 - Linefeed off if cmd_upper == "ATL0": vehicle_state.linefeed_on = False return "OK\n\n>" # ATL1 - Linefeed on if cmd_upper == "ATL1": vehicle_state.linefeed_on = True return "OK\n\n>" # ATH0 - Headers off if cmd_upper == "ATH0": vehicle_state.headers_on = False return "OK\n\n>" # ATH1 - Headers on if cmd_upper == "ATH1": vehicle_state.headers_on = True return "OK\n\n>" # ATS0 - Spaces off if cmd_upper == "ATS0": vehicle_state.spaces_on = False return "OK\n\n>" # ATS1 - Spaces on if cmd_upper == "ATS1": vehicle_state.spaces_on = True return "OK\n\n>" # ATSP - Set Protocol if cmd_upper.startswith("ATSP"): protocol_num = cmd_upper[4:] if len(cmd_upper) > 4 else "0" protocols = { "0": "AUTO", "1": "SAE J1850 PWM", "2": "SAE J1850 VPW", "3": "ISO 9141-2", "4": "ISO 14230-4 KWP", "5": "ISO 14230-4 KWP (fast)", "6": "ISO 15765-4 CAN (11 bit, 500 kbaud)", "7": "ISO 15765-4 CAN (29 bit, 500 kbaud)", "8": "ISO 15765-4 CAN (11 bit, 250 kbaud)", "9": "ISO 15765-4 CAN (29 bit, 250 kbaud)", "A": "SAE J1939 CAN", } vehicle_state.protocol = protocols.get(protocol_num, "AUTO") return "OK\n\n>" # ATDP - Describe Protocol if cmd_upper == "ATDP": return f"{vehicle_state.protocol}\n\n>" # ATDPN - Describe Protocol by Number if cmd_upper == "ATDPN": return "6\n\n>" # ISO 15765-4 CAN (11 bit, 500 kbaud) # ATRV - Read Voltage if cmd_upper == "ATRV": return f"{vehicle_state.voltage:.1f}V\n\n>" # ATWS - Warm Start if cmd_upper == "ATWS": return "ELM327 v1.5\n\n>" # ATD - Set to Defaults if cmd_upper == "ATD": vehicle_state = VehicleState() return "OK\n\n>" # ATAT - Adaptive Timing if cmd_upper.startswith("ATAT"): return "OK\n\n>" # ATST - Set Timeout if cmd_upper.startswith("ATST"): return "OK\n\n>" # ATMA - Monitor All if cmd_upper == "ATMA": return "MONITORING...\n(Press any key to stop)\n\n>" # ATPC - Protocol Close if cmd_upper == "ATPC": return "OK\n\n>" # Default response for unknown AT commands return "?\n\n>" def handle_obd_command(normalized): """Handle OBD-II commands""" # Update vehicle state vehicle_state.update() # Mode 03 - Read DTCs if normalized == "03": # Randomly select 1-5 DTCs from the pool num_dtcs = random.randint(1, 5) selected_dtcs = random.sample(DTC_POOL, min(num_dtcs, len(DTC_POOL))) # Build response: 43 [DTC codes...] response = "43" for dtc_code, _ in selected_dtcs: response += " " + dtc_code return response + "\n\n>" # Mode 07 - Read pending DTCs if normalized == "07": # Randomly select 0-2 pending DTCs from the pool num_pending = random.randint(0, 2) if num_pending == 0: # No pending DTCs response = "47" else: selected_pending = random.sample(DTC_POOL, min(num_pending, len(DTC_POOL))) response = "47" for dtc_code, _ in selected_pending: response += " " + dtc_code return response + "\n\n>" # Mode 04 - Clear DTCs if normalized == "04": return "44\n\n>" # Mode 09 - Vehicle Information if len(normalized) >= 4 and normalized[:2] == "09": mode09_pid = normalized[2:4] if mode09_pid == "00": # Supported PIDs response = "49 00 54 40 00 00" elif mode09_pid == "02": # VIN response = generate_vin_response() elif mode09_pid == "04": # Calibration ID response = generate_calibration_id_response() elif mode09_pid == "0A": # ECU Name response = generate_ecu_name_response() else: response = "NO DATA" return response + "\n\n>" # Mode 01 - Show current data if len(normalized) >= 4 and normalized[:2] == "01": pid = normalized[2:4] response = handle_mode01_pid(pid) return response + "\n\n>" return "NO DATA\n\n>" def send_elm327_command(command): """ Send a command to the ELM327 OBD-II adapter and get the response. Args: command (str): ELM327 command to send (e.g., 'ATZ', '01 0D') Returns: str: The response from the ELM327 adapter """ if not command or not command.strip(): return ">" # Normalize command normalized = normalize_command(command.strip()) # Handle AT commands if normalized.startswith("AT"): return handle_at_command(normalized) # Handle OBD commands return handle_obd_command(normalized) def get_system_status(): """ Get system status including IP address, network status, uptime, and memory. Returns: str: System status information """ import json import socket import psutil import platform # Get IP address try: # Get the default route interface IP s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip_address = s.getsockname()[0] s.close() except Exception: ip_address = "127.0.0.1" # Get uptime in seconds uptime_seconds = int(time.time() - psutil.boot_time()) # Get free memory in bytes memory_info = psutil.virtual_memory() free_memory_bytes = memory_info.available # Simulate WiFi RSSI (for simulation, use a random value between -40 and -70 dBm) # In real hardware this would come from the WiFi driver import random wifi_rssi_dbm = random.randint(-70, -40) # Build JSON response matching W600 MCP format status_data = { "ip_address": ip_address, "uptime_seconds": uptime_seconds, "free_memory_bytes": free_memory_bytes, "wifi_rssi_dbm": wifi_rssi_dbm, "elm327_status": "Simulated ELM327 on Gradio" } # Return formatted JSON string return json.dumps(status_data, indent=2) def get_elm327_history(count): """ Get historical log of OBD-II data (RPM, speed, coolant temp) with streaming support. Returns the last N records. Args: count (int): Number of most recent records to retrieve (default: 20, max: 20) Returns: str: Historical OBD-II data or error message """ import json if count is None or count == "": count = 20 else: count = int(count) # Enforce maximum limit of 20 records if count > 20: return json.dumps({ "error": "Request exceeds maximum limit", "message": f"Requested {count} records, but maximum allowed is 20 records. Please request 20 or fewer records.", "requested": count, "max_allowed": 20 }, indent=2) # Generate simulated test data # Simulate a scenario where the vehicle has been running for a while history_records = [] # Base timestamp in seconds (simulating uptime) base_time = 0 # Start from 0 seconds for i in range(count): # Simulate realistic driving conditions with more variations import math # RPM varies between idle (800) and higher driving (up to 4500) # Create multiple variation patterns for more realistic data cycle_position = (i % 50) / 50.0 sine_variation = math.sin(i * 0.1) * 400 # Sine wave variation random_noise = ((i * 7) % 300) - 150 # Random-like noise base_rpm = 800 + 2700 * cycle_position rpm = int(base_rpm + sine_variation + random_noise) rpm = max(700, min(rpm, 5000)) # Clamp to realistic range # Speed follows RPM with more variation (0-120 km/h) # Add acceleration/deceleration patterns speed_factor = (rpm - 800) / 35 # Convert RPM to rough speed speed_variation = math.cos(i * 0.15) * 15 # Speed variations speed = int(speed_factor + speed_variation + ((i * 3) % 20) - 10) speed = max(0, min(speed, 130)) # Clamp to 0-130 km/h # Coolant temp starts cold and warms up, then stabilizes if i < 20: coolant_temp = 20 + i * 3 # Warming up elif i < 40: coolant_temp = 60 + (i - 20) * 1 # Still warming else: coolant_temp = 80 + ((i * 5) % 15) # Stable operating temp with variation coolant_temp = min(coolant_temp, 95) # Cap at 95°C # Create record matching W600 MCP format # Time in seconds (2 second intervals) record = { "seq": i, "time": base_time + (i * 2), # Time in seconds, 2 seconds between samples "rpm": rpm, "speed": speed, "coolant_temp": coolant_temp } history_records.append(record) # Format as JSON array (matching MCP format) result = json.dumps(history_records) return result # Interface for ELM327 commands elm327_interface = gr.Interface( fn=send_elm327_command, inputs=gr.Textbox(label="Command", placeholder="e.g., ATZ, 01 0D"), outputs=gr.Textbox(label="Response"), title="ELM327 Commands", description="Send commands to the ELM327 OBD-II adapter" ) # Interface for system status status_interface = gr.Interface( fn=get_system_status, inputs=None, outputs=gr.Textbox(label="Status"), title="System Status", description="Get system status including IP address, network status, uptime, and memory" ) # Interface for ELM327 history history_interface = gr.Interface( fn=get_elm327_history, inputs=gr.Number(label="Count", value=20, precision=0, info="Number of most recent records to retrieve (default: 20, max: 20)"), outputs=gr.Textbox(label="History Data"), title="ELM327 History", description="Get historical log of OBD-II data (RPM, speed, coolant temp) - Maximum 20 records" ) # Combine all interfaces with tabs demo = gr.TabbedInterface( [elm327_interface, status_interface, history_interface], ["ELM327 Commands", "System Status", "History"] ) if __name__ == "__main__": demo.launch(mcp_server=True)