Project

General

Profile

New Model #9237 » gm30ctl.py

Alexandru Barbur, 05/29/2022 08:56 PM

 
1
#!/usr/bin/env python
2

    
3
# This script requires Python 3.
4
# $ pip install pyserial
5
# $ python gm30ctl.py
6

    
7
import io
8
import enum
9
import struct
10
import argparse
11
import typing as t
12
from pathlib import Path
13
from datetime import timedelta
14
from dataclasses import dataclass
15

    
16
import serial
17

    
18

    
19
##
20
# High level radio interface.
21
##
22

    
23
class RadioBootscreenMode(enum.Enum):
24
    LOGO = enum.auto()
25
    MESSAGE = enum.auto()
26
    VOLTAGE = enum.auto()
27

    
28

    
29
@dataclass
30
class Radio:
31
    bootscreen_mode: RadioBootscreenMode
32
    bootscreen_line1: str
33
    bootscreen_line2: str
34

    
35

    
36
class Protocol:
37
    """
38
    Radioddity GM-30 serial control and programming protocol.
39
    
40
    The protocol loosely follows a request/response flow with one-way or
41
    bi-directional acknowledgements. It relies on hardware flow control.
42

    
43
    Requests:
44
    - Command Requests
45
    - PSEARCH Request
46
    - PASSSTA Request (?)
47
    - SYSINFO Request (?)
48

    
49
    PSEARCH Request:
50
    - Send Bytes: 'PSEARCH' as ASCII
51
    - Read Ack:
52
      - 1x Byte: 0x06
53
    - Read Variable Length Response: Firmware variant name as ASCII
54
      - Known Variant: P13GMRS (US region GMRS firmware)
55

    
56
    Command Requests:
57
    - 1x Bytes: Request Type
58
    - 0x or 4x Bytes: Parameters
59

    
60
    Command Ack Request:
61
    - Type: 0x06
62
    - Parameters: N/A
63
    - Response: 0x06
64

    
65
    Command Read Request:
66
    - Radio must be in programming mode first.
67
    - Type: 0x52
68
    - Parameters:
69
      - 2x Bytes: Little-Endian Address
70
      - 1x Byte: 0x00 (?)
71
      - 1x Byte: Read Size
72
    - Response:
73
      - 1x Byte: 0x57
74
      - 2x Bytes: Little-Endian Address
75
      - 1x Byte: 0x00 (?)
76
      - 1x Byte: Read Size (Not Including 5x Byte Header)
77
      - Read Size x Bytes: Data
78
    - Requires Ack Request After
79

    
80
    Command Write Request:
81
    - Radio must be in programming mode first.
82
    - Type: 0x57
83
    - Parameters:
84
      - 2x Bytes: Little-Endian Address
85
      - 1x Byte: 0x00 (?)
86
      - 1x Byte: Write Size (Not Including 4x Byte Header)
87
      - Write Size x Bytes: Data
88
    - Response:
89
      - 1x Byte: 0x06
90
    """
91

    
92
    @staticmethod
93
    def open_port(device_path: Path) -> serial.Serial:
94
        return serial.Serial(
95
            port=str(device_path),
96
            baudrate=57600,
97
            bytesize=serial.EIGHTBITS,
98
            parity=serial.PARITY_NONE,
99
            stopbits=serial.STOPBITS_ONE,
100
            xonxoff=False,
101
            rtscts=True,
102
            dsrdtr=True,
103
            timeout=None)
104

    
105
    def __init__(self, port: serial.Serial, timeout = timedelta(seconds=1)):
106
        self.port = port
107
        self.timeout = timeout
108

    
109
    def _reset(self):
110
        # XXX: log warning if buffers are not empty
111
        # XXX: not sure if this is actually necessary or useful
112
        self.port.reset_input_buffer()
113
        self.port.reset_output_buffer()
114

    
115
    def _fixed_write(self, data: bytes) -> int:
116
        self.port.write(data)
117
        self.port.flush()
118

    
119
    def _variable_read(self, max_count: int) -> bytes:
120
        self.port.timeout = self.timeout.total_seconds()
121
        return self.port.read(max_count)
122

    
123
    def _fixed_read(self, expected_count: int) -> bytes:
124
        response = self._variable_read(expected_count)
125
        if not response:
126
            self._reset()
127
            raise RuntimeError("No response received")
128

    
129
        if len(response) != expected_count:
130
            self._reset()
131
            raise RuntimeError("Unexpected read size")
132

    
133
        return response
134

    
135

    
136

    
137
    def send_ack(self):
138
        self._fixed_write(bytes([0x06]))
139

    
140
    def receive_ack(self):
141
        response = self._fixed_read(1)
142
        if response != bytes([0x06]):
143
            raise RuntimeError("Failed to receive ACK")
144

    
145
    def read_memory(self, address: int, size: int) -> bytes:
146
        # Request: 0x52 ADDRx2 0x00 SIZEx1
147
        request = bytearray([0x52, 0x00, 0x00, 0x00, 0x00])
148
        struct.pack_into('<HxB', request, 1, address, size)
149
        self._fixed_write(request)
150

    
151
        # Response: 0x57 ADDRx2 0x00 SIZEx1 [DATAx1 .. DATAx1]
152
        response = self._fixed_read(5 + size)
153
        if response[0] != 0x57 or response[1:5] != request[1:5]:
154
            raise RuntimeError("Read memory response invalid header")
155

    
156
        # Sync
157
        self.send_ack()
158
        self.receive_ack()
159

    
160
        return response[5:]
161

    
162
    def write_memory(self, address: int, data: bytes):
163
        # Request: 0x57 ADDRx2 0x00 SIZEx1 [DATAx1 .. DATAx2]
164
        request = bytearray([0x57, 0x00, 0x00, 0x00, 0x00])
165
        struct.pack_into('<HxB', request, 1, address, len(bytes))
166
        self._fixed_write(request + data)
167

    
168
        # Sync
169
        self.receive_ack(port)
170

    
171
    def read_memory_bytes(self, addresses: t.List[int]) -> bytes:
172
        data = bytearray()
173
        for address in addresses:
174
            data += self.read_memory(address, 0x01)
175
    
176
        return bytes(data)
177

    
178
    def read_memory_range(self, address: int, size: int, chunk_size: int = 0x40) -> bytes:
179
        data = bytearray()
180

    
181
        read_address = address
182
        read_bytes_remaining = size
183
        while read_bytes_remaining > 0:
184
            read_size = min(chunk_size, read_bytes_remaining)
185
            data += self.read_memory(read_address, read_size)
186

    
187
            read_bytes_remaining -= read_size
188
            read_address += read_size
189

    
190
        return bytes(data)
191

    
192
    def write_memory_range(self, address: int, data: bytes, chunk_size: int = 0x40):
193
        write_counter = 0
194
        while write_counter < len(data):
195
            write_size = min(chunk_size, len(data) - write_counter)
196
            write_data = data[write_counter:write_counter + write_size]
197
            write_memory(port, address + write_counter, write_data)
198
    
199
            write_counter += write_size
200

    
201
    def query_firmware_variant(self) -> str:
202
        # Request
203
        self._fixed_write(b'PSEARCH')
204
        self.receive_ack()
205

    
206
        # Response: firmware variant name
207
        # Known variants: P13GMRS
208
        response = self._variable_read(16)
209
        if not response:
210
            self._reset()
211
            raise RuntimeError("Firmware variant query did not receive a response")
212

    
213
        return response.decode()
214

    
215
##
216
# Protocol functionality not fully understood yet.
217
##
218

    
219
def unknown_passsta(protocol):
220
    protocol._fixed_write(b'PASSSTA')
221
    response = protocol._fixed_read(3)
222
    assert response[:1].decode() == 'P'
223
    assert response[1] == 0x00
224
    assert response[2] == 0x00
225

    
226

    
227
def unknown_sysinfo(protocol):
228
    protocol._fixed_write(b'SYSINFO')
229
    protocol.receive_ack()
230

    
231

    
232
def common_init(
233
    protocol,
234
    query_unknown_passsta = True,
235
    query_unknown_sysinfo = True
236
):
237
    # querying for use later on when entering programming mode
238
    # XXX not required to enter read/write mode
239
    fw_variant = protocol.query_firmware_variant()
240
    assert fw_variant == 'P13GMRS'
241

    
242
    # XXX checking whether a password is set?
243
    # XXX not required to enter programming mode
244
    if query_unknown_passsta:
245
        unknown_passsta(protocol)
246

    
247
    # XXX required to enter programming mode
248
    unknown_sysinfo(protocol)
249

    
250
    # XXX some kind of timestamp query or checksum calculation?
251
    # XXX seems to change based on the contents of radio memory
252
    # XXX does not seem to change over time on it's own
253
    # XXX requires sysinfo command to be sent first
254
    # XXX not required to enter programming mode
255
    if query_unknown_sysinfo:
256
        # XXX
257
        protocol._fixed_write(bytes([0x56, 0x00, 0x00, 0x0A, 0x0D]))
258
        response = protocol._fixed_read(3)
259
        assert response == bytes([0x56, 0x0D, 0x0A])
260
    
261
        response = protocol._fixed_read(10)
262
        print(response.hex(' '))
263
    
264
        protocol.send_ack()
265
        protocol.receive_ack()
266
    
267
        # XXX
268
        protocol._fixed_write(bytes([0x56, 0x00, 0x10, 0x0A, 0x0D]))
269
        response = protocol._fixed_read(3)
270
        assert response == bytes([0x56, 0x0D, 0x0A])
271
    
272
        response = protocol._fixed_read(10)
273
        print(response.hex(' '))
274
    
275
        protocol.send_ack()
276
        protocol.receive_ack()
277
    
278
        # XXX
279
        protocol._fixed_write(bytes([0x56, 0x00, 0x20, 0x0A, 0x0D]))
280
        response = protocol._fixed_read(3)
281
        assert response == bytes([0x56, 0x0D, 0x0A])
282
    
283
        response = protocol._fixed_read(10)
284
        print(response.hex(' '))
285
    
286
        protocol.send_ack()
287
        protocol.receive_ack()
288
    
289
        # XXX seems to be different variant then three queries above?
290
        protocol._fixed_write(bytes([0x56, 0x00, 0x00, 0x00, 0x0A]))
291
        response = protocol._fixed_read(3)
292
        print(response.hex(' '))
293
    
294
        response = protocol._fixed_read(3)
295
        print(response.hex(' '))
296
    
297
        response = protocol._fixed_read(5)
298
        print(response.hex(' '))
299
    
300
        protocol.send_ack()
301
        protocol.receive_ack()
302

    
303
    # XXX: this seems to set a timeout where if no further commands are
304
    # received within a certain window the radio will reset
305
    # required to enter programming mode
306
    protocol._fixed_write(bytes([0xFF, 0xFF, 0xFF, 0xFF, 0x0C]))
307
    protocol._fixed_write(fw_variant.encode()) # XXX: b'P13GMRS'
308
    protocol.receive_ack()
309

    
310
    # XXX required to enter programming mode
311
    protocol._fixed_write(bytes([0x02]))
312
    response = protocol._fixed_read(8)
313
    assert response == bytes([0xFF] * 8)
314

    
315
    protocol.send_ack()
316
    protocol.receive_ack()
317

    
318
##
319
# Console Interface
320
##
321

    
322
def read(device_path: Path):
323
    with Protocol.open_port(device_path) as port:
324
        protocol = Protocol(port)
325
        common_init(protocol)
326

    
327
        # XXX no idea what this data is or why it's read
328
        data = protocol.read_memory_bytes([
329
            0x1FFF,
330
            0x2FFF,
331
            0x3FFF,
332
            0x4FFF,
333
            0x5FFF,
334
            0x6FFF,
335
            0x7FFF,
336
            0x8FFF,
337
            0x9FFF,
338
            0xAFFF,
339
            0xBFFF,
340
            0xCFFF,
341
            0xDFFF,
342
            0xEFFF,
343
            0xFFFF])
344

    
345
        with open('region_1FFF.bin', 'wb') as output_file:
346
            output_file.write(data)
347

    
348
        # XXX data file 0x2000, ???
349
        data = protocol.read_memory_range(0x1000, 0xFC0)
350
        with open('region_1000.bin', 'wb') as output_file:
351
            output_file.write(data)
352

    
353
        # XXX data file 0x3000 with some differences, ???
354
        data = protocol.read_memory_range(0xF000, 0xFC0)
355
        with open('region_F000.bin', 'wb') as output_file:
356
            output_file.write(data)
357

    
358
        # XXX data file 0x4000, channel names?
359
        data = protocol.read_memory_range(0x3000, 0xFC0)
360
        with open('region_3000.bin', 'wb') as output_file:
361
            output_file.write(data)
362

    
363
        # XXX data file 0x5000, general settings?
364
        data = protocol.read_memory_range(0xB000, 0xFC0)
365
        with open('region_B000.bin', 'wb') as output_file:
366
            output_file.write(data)
367

    
368
        # XXX data file 0x6000, ???
369
        data = protocol.read_memory_range(0xD000, 0xFC0)
370
        with open('region_D000.bin', 'wb') as output_file:
371
            output_file.write(data)
372

    
373

    
374
def write(device_path: Path):
375
    with Protocol.open_port(device_path) as port:
376
        protocol = Protocol(port)
377
        common_init(protocol)
378

    
379
        # XXX no idea what this data is or why it's read
380
        protocol.read_memory_bytes([
381
            0x1FFF,
382
            0x2FFF,
383
            0x3FFF,
384
            0x4FFF,
385
            0x5FFF,
386
            0x6FFF,
387
            0x7FFF,
388
            0x8FFF,
389
            0x9FFF,
390
            0xAFFF,
391
            0xBFFF,
392
            0xCFFF,
393
            0xDFFF,
394
            0xEFFF,
395
            0xFFFF])
396

    
397
        # XXX data file 0x3000, ???
398
        with open('region_7000.bin', 'rb') as input_file:
399
            data = input_file.read()
400

    
401
        assert len(data) == 0xFC0
402
        protocol.write_memory_range(0x7000, data)
403

    
404
        # XXX data file 0x4000, channel names?
405
        with open('region_3000.bin', 'rb') as input_file:
406
            data = input_file.read()
407

    
408
        assert len(data) == 0xFC0
409
        protocol.write_memory_range(0x3000, data)
410

    
411
        # XXX data file offset 0x5000, general settings?
412
        with open('region_4000.bin', 'rb') as input_file:
413
            data = input_file.read()
414

    
415
        assert len(data) == 0xFC0
416
        protocol.write_memory_range(0x4000, data)
417

    
418
        # XXX data file offset 0x6000, ???
419
        with open('region_D000.bin', 'rb') as input_file:
420
            data = input_file.read()
421

    
422
        assert len(data) == 0xFC0
423
        protocol.write_memory_range(0xD000, data)
424

    
425

    
426
def main():
427
    parser = argparse.ArgumentParser()
428
    parser.add_argument(
429
        '-d', '--device',
430
        type=Path,
431
        default='/dev/ttyUSB0')
432

    
433
    subparsers = parser.add_subparsers()
434

    
435
    parser_read = subparsers.add_parser('read', help="read radio memory")
436
    parser_read.set_defaults(run=read)
437

    
438
    parser_write = subparsers.add_parser('write', help="write radio memory")
439
    parser_write.set_defaults(run=write)
440

    
441
    args = parser.parse_args()
442
    args.run(device_path=args.device)
443

    
444

    
445
if __name__ == '__main__':
446
    main()
(9-9/11)