Project

General

Profile

New Model #9237 » gm30ctl.py

Alexandru Barbur, 05/29/2022 02:00 AM

 
1
#!/usr/bin/env python
2

    
3
# This script requires Python 3.
4
# $ pip install pyserial
5
# $ python gm30ctl.py
6

    
7
import io
8
import struct
9
import argparse
10
import typing as t
11
from pathlib import Path
12

    
13
import serial
14

    
15

    
16
def send_ack(port):
17
    port.write(bytes([0x06]))
18
    port.flush()
19

    
20

    
21
def receive_ack(port):
22
    response = port.read(1)
23
    assert response == bytes([0x06])
24

    
25

    
26
def read_memory(port, address: int, size: int) -> bytes:
27
    # Request: 0x52 ADDRx2 0x00 SIZEx1
28
    request = bytearray([0x52, 0x00, 0x00, 0x00, 0x00])
29
    struct.pack_into('<HxB', request, 1, address, size)
30

    
31
    port.write(request)
32
    port.flush()
33

    
34
    # Response: 0x57 ADDRx2 0x00 SIZEx1 [DATAx1 .. DATAx1]
35
    expected_response_size = 5 + size
36
    response = port.read(expected_response_size)
37
    assert len(response) == expected_response_size
38
    assert response[0] == 0x57
39
    assert response[1:5] == request[1:5]
40

    
41
    send_ack(port)
42
    receive_ack(port)
43

    
44
    return response[5:]
45

    
46

    
47
def write_memory(port, data: bytes, address: int):
48
    # Request: 0x57 ADDRx2 0x00 SIZEx1 [DATAx1 .. DATAx2]
49
    request = bytearray([0x57, 0x00, 0x00, 0x00, 0x00])
50
    struct.pack_into('<HxB', request, 1, address, len(bytes))
51

    
52
    port.write(request)
53
    port.write(data)
54
    port.flush()
55

    
56
    # Response
57
    receive_ack(port)
58

    
59

    
60
def read_memory_bytes(port, addresses: t.List[int]) -> bytes:
61
    buffer = io.BytesIO()
62
    for address in addresses:
63
        buffer.write(read_memory(port, address, 0x01))
64

    
65
    buffer.seek(0)
66
    return buffer.read()
67

    
68

    
69
def read_memory_range(port, address: int, size: int, chunk_size: int = 0x40) -> bytes:
70
    buffer = io.BytesIO()
71

    
72
    read_address = address
73
    read_bytes_remaining = size
74
    while read_bytes_remaining > 0:
75
        read_size = min(chunk_size, read_bytes_remaining)
76
        buffer.write(read_memory(port, read_address, read_size))
77

    
78
        read_bytes_remaining -= read_size
79
        read_address += read_size
80

    
81
    buffer.seek(0)
82
    return buffer.read()
83

    
84

    
85
def write_memory_range(port, data: bytes, address: int, chunk_size: int = 0x40):
86
    write_counter = 0
87
    while write_counter < len(bytes):
88
        write_size = min(chunk_size, len(bytes) - write_counter)
89
        write_data = data[write_counter:write_counter + write_size]
90
        write_memory(port, address + write_counter, write_data)
91

    
92
        write_counter += write_size
93

    
94

    
95
def common_init(port):
96
    # PSEARCH
97
    port.write(b'PSEARCH')
98
    port.flush()
99

    
100
    receive_ack(port)
101

    
102
    response = port.read(7)
103
    assert response.decode() == 'P13GMRS'
104

    
105
    # XXX
106
    port.write(b'PASSSTA')
107
    port.flush()
108

    
109
    response = port.read(3)
110
    assert response[:1].decode() == 'P'
111
    assert response[1] == 0x00
112
    assert response[2] == 0x00
113

    
114
    # XXX
115
    port.write(b'SYSINFO')
116
    port.flush()
117

    
118
    receive_ack(port)
119

    
120
    # XXX
121
    port.write(bytes([0x56, 0x00, 0x00, 0x0A, 0x0D]))
122
    port.flush()
123

    
124
    response = port.read(3)
125
    assert response == bytes([0x56, 0x0D, 0x0A])
126

    
127
    response = port.read(10)
128
    print(' '.join([f"0x{b:x}" for b in response]))
129
    # assert response == bytes([
130
    #     0x0A,
131
    #     0x0D,
132
    #     0x00,
133
    #     0x00,
134
    #     0x00,
135
    #     0x11,
136
    #     0xCA,
137
    #     0x00,
138
    #     0x08,
139
    #     0x01])
140

    
141
    send_ack(port)
142
    receive_ack(port)
143

    
144
    # XXX
145
    port.write(bytes([0x56, 0x00, 0x10, 0x0A, 0x0D]))
146
    port.flush()
147

    
148
    response = port.read(3)
149
    assert response == bytes([0x56, 0x0D, 0x0A])
150

    
151
    response = port.read(10)
152
    print(' '.join([f"0x{b:x}" for b in response]))
153
    # assert response == bytes([
154
    #     0x0A,
155
    #     0x0D,
156
    #     0x00,
157
    #     0x00,
158
    #     0x00,
159
    #     0x11,
160
    #     0xCA,
161
    #     0x00,
162
    #     0x08,
163
    #     0x01])
164

    
165
    send_ack(port)
166
    receive_ack(port)
167

    
168
    # XXX
169
    port.write(bytes([0x56, 0x00, 0x20, 0x0A, 0x0D]))
170
    port.flush()
171

    
172
    response = port.read(3)
173
    assert response == bytes([0x56, 0x0D, 0x0A])
174

    
175
    response = port.read(10)
176
    print(' '.join([f"0x{b:x}" for b in response]))
177
    # assert response == bytes([
178
    #     0x0A,
179
    #     0x0D,
180
    #     0x00,
181
    #     0x00,
182
    #     0x00,
183
    #     0x11,
184
    #     0xCA,
185
    #     0x00,
186
    #     0x08,
187
    #     0x01])
188

    
189
    send_ack(port)
190
    receive_ack(port)
191

    
192
    # XXX
193
    port.write(bytes([0x56, 0x00, 0x00, 0x00, 0x0A]))
194
    port.flush()
195

    
196
    response = port.read(3)
197
    print(' '.join([f"0x{b:x}" for b in response]))
198
    # assert response == bytes([0x56, 0x0D, 0x0A])
199

    
200
    response = port.read(3)
201
    print(' '.join([f"0x{b:x}" for b in response]))
202
    # assert response == bytes([0x56, 0x0A, 0x08])
203

    
204
    response = port.read(5)
205
    print(' '.join([f"0x{b:x}" for b in response]))
206
    # assert response == bytes([0x00, 0x10, 0x00, 0x00, 0xFF])
207

    
208
    send_ack(port)
209
    receive_ack(port)
210

    
211
    # XXX
212
    port.write(bytes([0xFF, 0xFF, 0xFF, 0xFF, 0x0C]))
213
    port.flush()
214

    
215
    port.write(b'P13GMRS')
216
    port.flush()
217

    
218
    receive_ack(port)
219

    
220
    # XXX
221
    port.write(bytes([0x02]))
222
    port.flush()
223

    
224
    response = port.read(8)
225
    assert response == bytes([0xFF] * 8)
226

    
227
    send_ack(port)
228
    receive_ack(port)
229

    
230
    # XXX
231
    data = read_memory_bytes(port, [
232
        0x1FFF,
233
        0x2FFF,
234
        0x3FFF,
235
        0x4FFF,
236
        0x5FFF,
237
        0x6FFF,
238
        0x7FFF,
239
        0x8FFF,
240
        0x9FFF,
241
        0xAFFF,
242
        0xBFFF,
243
        0xCFFF,
244
        0xDFFF,
245
        0xEFFF,
246
        0xFFFF])
247

    
248
    with open('region_1FFF.bin', 'wb') as output_file:
249
        output_file.write(data)
250

    
251

    
252
def read(device_path: Path):
253
    with serial.Serial(
254
        port=str(device_path),
255
        baudrate=57600,
256
        bytesize=serial.EIGHTBITS,
257
        parity=serial.PARITY_NONE,
258
        stopbits=serial.STOPBITS_ONE,
259
        xonxoff=False,
260
        rtscts=True,
261
        dsrdtr=True
262
    ) as port:
263
        common_init(port)
264

    
265
        # XXX data file 0x2000, ???
266
        data = read_memory_range(port, 0x1000, 0xFC0, 0x40)
267
        with open('region_1000.bin', 'wb') as output_file:
268
            output_file.write(data)
269

    
270
        # XXX data file 0x3000 with some differences, ???
271
        data = read_memory_range(port, 0xF000, 0xFC0, 0x40)
272
        with open('region_F000.bin', 'wb') as output_file:
273
            output_file.write(data)
274

    
275
        # XXX data file 0x4000, channel names?
276
        data = read_memory_range(port, 0x3000, 0xFC0, 0x40)
277
        with open('region_3000.bin', 'wb') as output_file:
278
            output_file.write(data)
279

    
280
        # XXX data file 0x5000, general settings?
281
        data = read_memory_range(port, 0xB000, 0xFC0, 0x40)
282
        with open('region_B000.bin', 'wb') as output_file:
283
            output_file.write(data)
284

    
285
        # XXX data file 0x6000, ???
286
        data = read_memory_range(port, 0xD000, 0xFC0, 0x40)
287
        with open('region_D000.bin', 'wb') as output_file:
288
            output_file.write(data)
289

    
290

    
291
def write(device_path: Path):
292
    with serial.Serial(
293
        port=str(device_path),
294
        baudrate=57600,
295
        bytesize=serial.EIGHTBITS,
296
        parity=serial.PARITY_NONE,
297
        stopbits=serial.STOPBITS_ONE,
298
        xonxoff=False,
299
        rtscts=True,
300
        dsrdtr=True
301
    ) as port:
302
        common_init(port)
303

    
304
        # XXX data file 0x3000, ???
305
        with open('region_7000.bin', 'rb') as input_file:
306
            data = input_file.read()
307

    
308
        assert len(data) == 0xFC0
309
        write_memory_range(port, data, 0x7000, 0x40)
310

    
311
        # XXX data file 0x4000, channel names?
312
        with open('region_3000.bin', 'rb') as input_file:
313
            data = input_file.read()
314

    
315
        assert len(data) == 0xFC0
316
        write_memory_range(port, data, 0x3000, 0x40)
317

    
318
        # XXX data file offset 0x5000, general settings?
319
        with open('region_4000.bin', 'rb') as input_file:
320
            data = input_file.read()
321

    
322
        assert len(data) == 0xFC0
323
        write_memory_range(port, data, 0x4000, 0x40)
324

    
325
        # XXX data file offset 0x6000, ???
326
        with open('region_D000.bin', 'rb') as input_file:
327
            data = input_file.read()
328

    
329
        assert len(data) == 0xFC0
330
        write_memory_range(port, data, 0xD000, 0x40)
331

    
332

    
333
def main():
334
    parser = argparse.ArgumentParser()
335
    parser.add_argument(
336
        '-d', '--device',
337
        type=Path,
338
        default='/dev/ttyUSB0')
339

    
340
    subparsers = parser.add_subparsers()
341

    
342
    parser_read = subparsers.add_parser('read', help="read radio memory")
343
    parser_read.set_defaults(run=read)
344

    
345
    parser_write = subparsers.add_parser('write', help="write radio memory")
346
    parser_write.set_defaults(run=write)
347

    
348
    args = parser.parse_args()
349
    args.run(device_path=args.device)
350

    
351

    
352
if __name__ == '__main__':
353
    main()
(6-6/11)