File size: 18,267 Bytes
255a798
 
 
 
 
 
 
 
 
 
3646ccf
255a798
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4d9469
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
import gradio as gr
import time
import random

# Vehicle state for dynamic responses
class VehicleState:
    def __init__(self):
        self.engine_running = True
        self.rpm = 800  # Idle RPM
        self.speed = 0
        self.coolant_temp = 90  # °C
        self.throttle_pos = 0
        self.maf_rate = 250  # 2.5 g/s idle (scaled by 100)
        self.fuel_level = 75  # 75%
        self.update_counter = 0

        # ELM327 settings
        self.echo_on = True
        self.linefeed_on = True
        self.headers_on = False
        self.spaces_on = True
        self.protocol = "AUTO"
        self.voltage = 13.05

    def update(self):
        """Update vehicle state with realistic variations"""
        self.update_counter += 1

        if not self.engine_running:
            return

        # RPM varies slightly at idle
        self.rpm = 800 + ((self.update_counter * 7) % 100) - 50

        # Coolant temp gradually increases when engine running
        if self.coolant_temp < 90:
            if self.update_counter % 10 == 0:
                self.coolant_temp += 1

        # Simulate some throttle/speed variation
        if self.update_counter % 50 == 0:
            self.throttle_pos = (self.update_counter // 5) % 30
            self.speed = self.throttle_pos * 2

        # MAF follows throttle roughly
        self.maf_rate = 250 + (self.throttle_pos * 5)

# Global vehicle state
vehicle_state = VehicleState()

# Pool of possible DTCs for testing
# Format: (code_bytes, description)
DTC_POOL = [
    ("01 71", "P0171 - System Too Lean (Bank 1)"),
    ("03 00", "P0300 - Random/Multiple Cylinder Misfire Detected"),
    ("04 42", "P0442 - EVAP Emission Control System Leak Detected (small leak)"),
    ("01 31", "P0131 - O2 Sensor Circuit Low Voltage (Bank 1, Sensor 1)"),
    ("01 33", "P0133 - O2 Sensor Circuit Slow Response (Bank 1, Sensor 1)"),
    ("01 71", "P0171 - System Too Lean (Bank 1)"),
    ("01 72", "P0172 - System Too Rich (Bank 1)"),
    ("02 02", "P0202 - Injector Circuit Malfunction - Cylinder 2"),
    ("03 20", "P0320 - Ignition/Distributor Engine Speed Input Circuit Malfunction"),
    ("04 20", "P0420 - Catalyst System Efficiency Below Threshold (Bank 1)"),
    ("05 00", "P0500 - Vehicle Speed Sensor Malfunction"),
    ("07 05", "P0705 - Transmission Range Sensor Circuit Malfunction (PRNDL Input)"),
    ("13 00", "P1300 - Igniter Circuit Malfunction"),
    ("00 16", "P0016 - Crankshaft Position/Camshaft Position Correlation (Bank 1)"),
    ("01 28", "P0128 - Coolant Thermostat (Coolant Temp Below Thermostat Regulating Temp)"),
]

# Mode 01 PID table (static responses)
MODE01_PID_TABLE = {
    "00": "41 00 BE 3F B8 13",  # Supported PIDs 01-20
    "01": "41 01 83 07 65 04",  # Monitor status (MIL on, 3 DTCs)
    "03": "41 03 02 00",         # Fuel system status (closed loop)
    "06": "41 06 80",            # Short term fuel trim
    "07": "41 07 80",            # Long term fuel trim
    "0A": "41 0A B4",            # Fuel pressure (540 kPa)
    "0B": "41 0B 63",            # Intake manifold pressure (99 kPa)
    "0E": "41 0E 7C",            # Timing advance (14 degrees)
    "0F": "41 0F 54",            # Intake air temperature (44°C)
    "13": "41 13 03",            # O2 sensors present
    "1C": "41 1C 01",            # OBD standard (OBD-II California ARB)
    "1F": "41 1F 00 8C",         # Run time since engine start (140s)
    "20": "41 20 80 01 80 01",   # Supported PIDs 21-40
    "21": "41 21 00 4B",         # Distance with MIL on (75 km)
    "33": "41 33 65",            # Barometric pressure (101 kPa)
    "40": "41 40 40 00 00 00",   # Supported PIDs 41-60
    "42": "41 42 32 E8",         # Control module voltage (13.05V)
    "51": "41 51 01",            # Fuel Type (Gasoline)
}

# Dynamic PIDs (generated from vehicle state)
DYNAMIC_PIDS = ["04", "05", "0C", "0D", "10", "11", "2F"]

def normalize_command(cmd):
    """Normalize command string (remove spaces, convert to uppercase)"""
    return ''.join(cmd.split()).upper()

def format_response(response, add_prompt=True):
    """Format response with ELM327 settings (echo, spaces, prompt)"""
    result = ""

    if vehicle_state.echo_on:
        result += response + "\n"

    if vehicle_state.linefeed_on:
        result += "\n"

    if add_prompt:
        result += ">"

    return result

def generate_rpm_response():
    """Generate dynamic RPM response (PID 0C)"""
    rpm_value = vehicle_state.rpm
    encoded = rpm_value * 4
    return f"41 0C {(encoded >> 8) & 0xFF:02X} {encoded & 0xFF:02X}"

def generate_speed_response():
    """Generate dynamic speed response (PID 0D)"""
    return f"41 0D {vehicle_state.speed:02X}"

def generate_coolant_temp_response():
    """Generate dynamic coolant temperature response (PID 05)"""
    encoded = vehicle_state.coolant_temp + 40
    return f"41 05 {encoded:02X}"

def generate_engine_load_response():
    """Generate dynamic engine load response (PID 04)"""
    if vehicle_state.engine_running:
        load = (vehicle_state.throttle_pos * 2 + vehicle_state.rpm // 100) // 3
        load = min(load, 100)
        encoded = (load * 255) // 100
    else:
        encoded = 0
    return f"41 04 {encoded:02X}"

def generate_maf_response():
    """Generate dynamic MAF response (PID 10)"""
    maf = vehicle_state.maf_rate
    return f"41 10 {(maf >> 8) & 0xFF:02X} {maf & 0xFF:02X}"

def generate_throttle_response():
    """Generate dynamic throttle position response (PID 11)"""
    encoded = (vehicle_state.throttle_pos * 255) // 100
    return f"41 11 {encoded:02X}"

def generate_fuel_level_response():
    """Generate dynamic fuel level response (PID 2F)"""
    encoded = (vehicle_state.fuel_level * 255) // 100
    return f"41 2F {encoded:02X}"

def handle_mode01_pid(pid):
    """Handle Mode 01 PID requests"""
    # Check if it's a dynamic PID
    if pid in DYNAMIC_PIDS:
        if pid == "0C":
            return generate_rpm_response()
        elif pid == "0D":
            return generate_speed_response()
        elif pid == "05":
            return generate_coolant_temp_response()
        elif pid == "04":
            return generate_engine_load_response()
        elif pid == "10":
            return generate_maf_response()
        elif pid == "11":
            return generate_throttle_response()
        elif pid == "2F":
            return generate_fuel_level_response()

    # Check static PID table
    if pid in MODE01_PID_TABLE:
        return MODE01_PID_TABLE[pid]

    return "NO DATA"

def generate_vin_response():
    """Generate Mode 09 PID 02 response (VIN)"""
    vin = "5TDKRKEC7PS142916"
    response = "49 02 01"
    for char in vin:
        response += f" {ord(char):02X}"
    return response

def generate_calibration_id_response():
    """Generate Mode 09 PID 04 response (Calibration ID)"""
    cal_id = "CAL123456"
    response = "49 04 01"
    for char in cal_id:
        response += f" {ord(char):02X}"
    return response

def generate_ecu_name_response():
    """Generate Mode 09 PID 0A response (ECU Name)"""
    ecu_name = "ECU_SIM_UNIT"
    response = "49 0A 01"
    for char in ecu_name:
        response += f" {ord(char):02X}"
    return response

def handle_at_command(cmd):
    """Handle AT commands"""
    global vehicle_state

    cmd_upper = cmd.upper()

    # ATZ - Reset
    if cmd_upper == "ATZ":
        vehicle_state = VehicleState()
        return "ELM327 v1.5\n\n>"

    # AT@1 - Device description
    if cmd_upper == "AT@1":
        return "OBDSIM ELM327\n\n>"

    # ATI - Version ID
    if cmd_upper == "ATI":
        return "ELM327 v1.5\n\n>"

    # ATE0 - Echo off
    if cmd_upper == "ATE0":
        vehicle_state.echo_on = False
        return "OK\n\n>"

    # ATE1 - Echo on
    if cmd_upper == "ATE1":
        vehicle_state.echo_on = True
        return "OK\n\n>"

    # ATL0 - Linefeed off
    if cmd_upper == "ATL0":
        vehicle_state.linefeed_on = False
        return "OK\n\n>"

    # ATL1 - Linefeed on
    if cmd_upper == "ATL1":
        vehicle_state.linefeed_on = True
        return "OK\n\n>"

    # ATH0 - Headers off
    if cmd_upper == "ATH0":
        vehicle_state.headers_on = False
        return "OK\n\n>"

    # ATH1 - Headers on
    if cmd_upper == "ATH1":
        vehicle_state.headers_on = True
        return "OK\n\n>"

    # ATS0 - Spaces off
    if cmd_upper == "ATS0":
        vehicle_state.spaces_on = False
        return "OK\n\n>"

    # ATS1 - Spaces on
    if cmd_upper == "ATS1":
        vehicle_state.spaces_on = True
        return "OK\n\n>"

    # ATSP - Set Protocol
    if cmd_upper.startswith("ATSP"):
        protocol_num = cmd_upper[4:] if len(cmd_upper) > 4 else "0"
        protocols = {
            "0": "AUTO",
            "1": "SAE J1850 PWM",
            "2": "SAE J1850 VPW",
            "3": "ISO 9141-2",
            "4": "ISO 14230-4 KWP",
            "5": "ISO 14230-4 KWP (fast)",
            "6": "ISO 15765-4 CAN (11 bit, 500 kbaud)",
            "7": "ISO 15765-4 CAN (29 bit, 500 kbaud)",
            "8": "ISO 15765-4 CAN (11 bit, 250 kbaud)",
            "9": "ISO 15765-4 CAN (29 bit, 250 kbaud)",
            "A": "SAE J1939 CAN",
        }
        vehicle_state.protocol = protocols.get(protocol_num, "AUTO")
        return "OK\n\n>"

    # ATDP - Describe Protocol
    if cmd_upper == "ATDP":
        return f"{vehicle_state.protocol}\n\n>"

    # ATDPN - Describe Protocol by Number
    if cmd_upper == "ATDPN":
        return "6\n\n>"  # ISO 15765-4 CAN (11 bit, 500 kbaud)

    # ATRV - Read Voltage
    if cmd_upper == "ATRV":
        return f"{vehicle_state.voltage:.1f}V\n\n>"

    # ATWS - Warm Start
    if cmd_upper == "ATWS":
        return "ELM327 v1.5\n\n>"

    # ATD - Set to Defaults
    if cmd_upper == "ATD":
        vehicle_state = VehicleState()
        return "OK\n\n>"

    # ATAT - Adaptive Timing
    if cmd_upper.startswith("ATAT"):
        return "OK\n\n>"

    # ATST - Set Timeout
    if cmd_upper.startswith("ATST"):
        return "OK\n\n>"

    # ATMA - Monitor All
    if cmd_upper == "ATMA":
        return "MONITORING...\n(Press any key to stop)\n\n>"

    # ATPC - Protocol Close
    if cmd_upper == "ATPC":
        return "OK\n\n>"

    # Default response for unknown AT commands
    return "?\n\n>"

def handle_obd_command(normalized):
    """Handle OBD-II commands"""
    # Update vehicle state
    vehicle_state.update()

    # Mode 03 - Read DTCs
    if normalized == "03":
        # Randomly select 1-5 DTCs from the pool
        num_dtcs = random.randint(1, 5)
        selected_dtcs = random.sample(DTC_POOL, min(num_dtcs, len(DTC_POOL)))

        # Build response: 43 [DTC codes...]
        response = "43"
        for dtc_code, _ in selected_dtcs:
            response += " " + dtc_code

        return response + "\n\n>"

    # Mode 07 - Read pending DTCs
    if normalized == "07":
        # Randomly select 0-2 pending DTCs from the pool
        num_pending = random.randint(0, 2)
        if num_pending == 0:
            # No pending DTCs
            response = "47"
        else:
            selected_pending = random.sample(DTC_POOL, min(num_pending, len(DTC_POOL)))
            response = "47"
            for dtc_code, _ in selected_pending:
                response += " " + dtc_code

        return response + "\n\n>"

    # Mode 04 - Clear DTCs
    if normalized == "04":
        return "44\n\n>"

    # Mode 09 - Vehicle Information
    if len(normalized) >= 4 and normalized[:2] == "09":
        mode09_pid = normalized[2:4]

        if mode09_pid == "00":
            # Supported PIDs
            response = "49 00 54 40 00 00"
        elif mode09_pid == "02":
            # VIN
            response = generate_vin_response()
        elif mode09_pid == "04":
            # Calibration ID
            response = generate_calibration_id_response()
        elif mode09_pid == "0A":
            # ECU Name
            response = generate_ecu_name_response()
        else:
            response = "NO DATA"

        return response + "\n\n>"

    # Mode 01 - Show current data
    if len(normalized) >= 4 and normalized[:2] == "01":
        pid = normalized[2:4]
        response = handle_mode01_pid(pid)
        return response + "\n\n>"

    return "NO DATA\n\n>"

def send_elm327_command(command):
    """
    Send a command to the ELM327 OBD-II adapter and get the response.

    Args:
        command (str): ELM327 command to send (e.g., 'ATZ', '01 0D')

    Returns:
        str: The response from the ELM327 adapter
    """
    if not command or not command.strip():
        return ">"

    # Normalize command
    normalized = normalize_command(command.strip())

    # Handle AT commands
    if normalized.startswith("AT"):
        return handle_at_command(normalized)

    # Handle OBD commands
    return handle_obd_command(normalized)

def get_system_status():
    """
    Get system status including IP address, network status, uptime, and memory.

    Returns:
        str: System status information
    """
    import json
    import socket
    import psutil
    import platform

    # Get IP address
    try:
        # Get the default route interface IP
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip_address = s.getsockname()[0]
        s.close()
    except Exception:
        ip_address = "127.0.0.1"

    # Get uptime in seconds
    uptime_seconds = int(time.time() - psutil.boot_time())

    # Get free memory in bytes
    memory_info = psutil.virtual_memory()
    free_memory_bytes = memory_info.available

    # Simulate WiFi RSSI (for simulation, use a random value between -40 and -70 dBm)
    # In real hardware this would come from the WiFi driver
    import random
    wifi_rssi_dbm = random.randint(-70, -40)

    # Build JSON response matching W600 MCP format
    status_data = {
        "ip_address": ip_address,
        "uptime_seconds": uptime_seconds,
        "free_memory_bytes": free_memory_bytes,
        "wifi_rssi_dbm": wifi_rssi_dbm,
        "elm327_status": "Simulated ELM327 on Gradio"
    }

    # Return formatted JSON string
    return json.dumps(status_data, indent=2)

def get_elm327_history(count):
    """
    Get historical log of OBD-II data (RPM, speed, coolant temp) with streaming support.
    Returns the last N records.

    Args:
        count (int): Number of most recent records to retrieve (default: 20, max: 20)

    Returns:
        str: Historical OBD-II data or error message
    """
    import json

    if count is None or count == "":
        count = 20
    else:
        count = int(count)

    # Enforce maximum limit of 20 records
    if count > 20:
        return json.dumps({
            "error": "Request exceeds maximum limit",
            "message": f"Requested {count} records, but maximum allowed is 20 records. Please request 20 or fewer records.",
            "requested": count,
            "max_allowed": 20
        }, indent=2)

    # Generate simulated test data
    # Simulate a scenario where the vehicle has been running for a while
    history_records = []

    # Base timestamp in seconds (simulating uptime)
    base_time = 0  # Start from 0 seconds

    for i in range(count):
        # Simulate realistic driving conditions with more variations
        import math

        # RPM varies between idle (800) and higher driving (up to 4500)
        # Create multiple variation patterns for more realistic data
        cycle_position = (i % 50) / 50.0
        sine_variation = math.sin(i * 0.1) * 400  # Sine wave variation
        random_noise = ((i * 7) % 300) - 150  # Random-like noise
        base_rpm = 800 + 2700 * cycle_position
        rpm = int(base_rpm + sine_variation + random_noise)
        rpm = max(700, min(rpm, 5000))  # Clamp to realistic range

        # Speed follows RPM with more variation (0-120 km/h)
        # Add acceleration/deceleration patterns
        speed_factor = (rpm - 800) / 35  # Convert RPM to rough speed
        speed_variation = math.cos(i * 0.15) * 15  # Speed variations
        speed = int(speed_factor + speed_variation + ((i * 3) % 20) - 10)
        speed = max(0, min(speed, 130))  # Clamp to 0-130 km/h

        # Coolant temp starts cold and warms up, then stabilizes
        if i < 20:
            coolant_temp = 20 + i * 3  # Warming up
        elif i < 40:
            coolant_temp = 60 + (i - 20) * 1  # Still warming
        else:
            coolant_temp = 80 + ((i * 5) % 15)  # Stable operating temp with variation
        coolant_temp = min(coolant_temp, 95)  # Cap at 95°C

        # Create record matching W600 MCP format
        # Time in seconds (2 second intervals)
        record = {
            "seq": i,
            "time": base_time + (i * 2),  # Time in seconds, 2 seconds between samples
            "rpm": rpm,
            "speed": speed,
            "coolant_temp": coolant_temp
        }
        history_records.append(record)

    # Format as JSON array (matching MCP format)
    result = json.dumps(history_records)

    return result

# Interface for ELM327 commands
elm327_interface = gr.Interface(
    fn=send_elm327_command,
    inputs=gr.Textbox(label="Command", placeholder="e.g., ATZ, 01 0D"),
    outputs=gr.Textbox(label="Response"),
    title="ELM327 Commands",
    description="Send commands to the ELM327 OBD-II adapter"
)

# Interface for system status
status_interface = gr.Interface(
    fn=get_system_status,
    inputs=None,
    outputs=gr.Textbox(label="Status"),
    title="System Status",
    description="Get system status including IP address, network status, uptime, and memory"
)

# Interface for ELM327 history
history_interface = gr.Interface(
    fn=get_elm327_history,
    inputs=gr.Number(label="Count", value=20, precision=0,
                     info="Number of most recent records to retrieve (default: 20, max: 20)"),
    outputs=gr.Textbox(label="History Data"),
    title="ELM327 History",
    description="Get historical log of OBD-II data (RPM, speed, coolant temp) - Maximum 20 records"
)

# Combine all interfaces with tabs
demo = gr.TabbedInterface(
    [elm327_interface, status_interface, history_interface],
    ["ELM327 Commands", "System Status", "History"]
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)