Project

General

Profile

New Model #9237 » gm30ctl.py

Alexandru Barbur, 05/28/2022 09:37 PM

 
1
#!/usr/bin/env python
2

    
3
# This script requires Python 3.
4
# $ pip install pyserial
5
# $ python gm30ctl.py
6

    
7
import io
8
import struct
9
import argparse
10
import typing as t
11
from pathlib import Path
12

    
13
import serial
14

    
15

    
16
def send_ack(port):
17
    port.write(bytes([0x06]))
18
    port.flush()
19

    
20

    
21
def receive_ack(port):
22
    response = port.read(1)
23
    assert response == bytes([0x06])
24

    
25

    
26
def read_memory(port, address: int, size: int) -> bytes:
27
    # Request: 0x52 ADDRx2 0x00 SIZEx1
28
    request = bytearray([0x52, 0x00, 0x00, 0x00, 0x00])
29
    struct.pack_into('<HxB', request, 1, address, size) 
30

    
31
    port.write(request)
32
    port.flush()
33

    
34
    # Response: 0x57 ADDRx2 0x00 SIZEx1 [DATAx1 .. DATAx1]
35
    expected_response_size = 5 + size
36
    response = port.read(expected_response_size)
37
    assert len(response) == expected_response_size
38
    assert response[0] == 0x57
39
    assert response[1:5] == request[1:5]
40

    
41
    send_ack(port)
42
    receive_ack(port)
43

    
44
    return response[5:]
45

    
46

    
47
def read_memory_bytes(port, addresses: t.List[int]) -> bytes:
48
    buffer = io.BytesIO()
49
    for address in addresses:
50
        buffer.write(read_memory(port, address, 0x01))
51

    
52
    buffer.seek(0)
53
    return buffer.read()
54

    
55

    
56
def read_memory_range(port, address: int, size: int, chunk_size: int = 0x40) -> bytes:
57
    buffer = io.BytesIO()
58

    
59
    read_address = address
60
    read_bytes_remaining = size
61
    while read_bytes_remaining > 0:
62
        read_size = min(chunk_size, read_bytes_remaining)
63
        buffer.write(read_memory(port, read_address, read_size))
64

    
65
        read_bytes_remaining -= read_size
66
        read_address += read_size
67

    
68
    buffer.seek(0)
69
    return buffer.read()
70

    
71

    
72
def read(device_path: Path):
73
    with serial.Serial(
74
        port=str(device_path),
75
        baudrate=57600,
76
        bytesize=serial.EIGHTBITS,
77
        parity=serial.PARITY_NONE,
78
        stopbits=serial.STOPBITS_ONE,
79
        xonxoff=False,
80
        rtscts=True,
81
        dsrdtr=True
82
    ) as port:
83
        # PSEARCH
84
        port.write(b'PSEARCH')
85
        port.flush()
86

    
87
        receive_ack(port)
88

    
89
        response = port.read(7)
90
        assert response.decode() == 'P13GMRS'
91

    
92
        # XXX
93
        port.write(b'PASSSTA')
94
        port.flush()
95

    
96
        response = port.read(3)
97
        assert response[:1].decode() == 'P'
98
        assert response[1] == 0x00
99
        assert response[2] == 0x00
100

    
101
        # XXX
102
        port.write(b'SYSINFO')
103
        port.flush()
104

    
105
        receive_ack(port)
106

    
107
        # XXX
108
        port.write(bytes([0x56, 0x00, 0x00, 0x0A, 0x0D]))
109
        port.flush()
110

    
111
        response = port.read(3)
112
        assert response == bytes([0x56, 0x0D, 0x0A])
113

    
114
        response = port.read(10)
115
        print(' '.join([f"0x{b:x}" for b in response]))
116
        # assert response == bytes([
117
        #     0x0A,
118
        #     0x0D,
119
        #     0x00,
120
        #     0x00,
121
        #     0x00,
122
        #     0x11,
123
        #     0xCA,
124
        #     0x00,
125
        #     0x08,
126
        #     0x01])
127

    
128
        send_ack(port)
129
        receive_ack(port)
130

    
131
        # XXX
132
        port.write(bytes([0x56, 0x00, 0x10, 0x0A, 0x0D]))
133
        port.flush()
134

    
135
        response = port.read(3)
136
        assert response == bytes([0x56, 0x0D, 0x0A])
137

    
138
        response = port.read(10)
139
        print(' '.join([f"0x{b:x}" for b in response]))
140
        # assert response == bytes([
141
        #     0x0A,
142
        #     0x0D,
143
        #     0x00,
144
        #     0x00,
145
        #     0x00,
146
        #     0x11,
147
        #     0xCA,
148
        #     0x00,
149
        #     0x08,
150
        #     0x01])
151

    
152
        send_ack(port)
153
        receive_ack(port)
154

    
155
        # XXX
156
        port.write(bytes([0x56, 0x00, 0x20, 0x0A, 0x0D]))
157
        port.flush()
158

    
159
        response = port.read(3)
160
        assert response == bytes([0x56, 0x0D, 0x0A])
161

    
162
        response = port.read(10)
163
        print(' '.join([f"0x{b:x}" for b in response]))
164
        # assert response == bytes([
165
        #     0x0A,
166
        #     0x0D,
167
        #     0x00,
168
        #     0x00,
169
        #     0x00,
170
        #     0x11,
171
        #     0xCA,
172
        #     0x00,
173
        #     0x08,
174
        #     0x01])
175

    
176
        send_ack(port)
177
        receive_ack(port)
178

    
179
        # XXX
180
        port.write(bytes([0x56, 0x00, 0x00, 0x00, 0x0A]))
181
        port.flush()
182

    
183
        response = port.read(3)
184
        print(' '.join([f"0x{b:x}" for b in response]))
185
        # assert response == bytes([0x56, 0x0D, 0x0A])
186

    
187
        response = port.read(3)
188
        print(' '.join([f"0x{b:x}" for b in response]))
189
        # assert response == bytes([0x56, 0x0A, 0x08])
190

    
191
        response = port.read(5)
192
        print(' '.join([f"0x{b:x}" for b in response]))
193
        # assert response == bytes([0x00, 0x10, 0x00, 0x00, 0xFF])
194

    
195
        send_ack(port)
196
        receive_ack(port)
197

    
198
        # XXX
199
        port.write(bytes([0xFF, 0xFF, 0xFF, 0xFF, 0x0C]))
200
        port.flush()
201

    
202
        port.write(b'P13GMRS')
203
        port.flush()
204

    
205
        receive_ack(port)
206

    
207
        # XXX
208
        port.write(bytes([0x02]))
209
        port.flush()
210

    
211
        response = port.read(8)
212
        assert response == bytes([0xFF] * 8)
213

    
214
        send_ack(port)
215
        receive_ack(port)
216

    
217
        # XXX
218
        data = read_memory_bytes(port, [
219
            0x1FFF,
220
            0x2FFF,
221
            0x3FFF,
222
            0x4FFF,
223
            0x5FFF,
224
            0x6FFF,
225
            0x7FFF,
226
            0x8FFF,
227
            0x9FFF,
228
            0xAFFF,
229
            0xBFFF,
230
            0xCFFF,
231
            0xDFFF,
232
            0xEFFF,
233
            0xFFFF])
234

    
235
        with open('region_1FFF.bin', 'wb') as output_file:
236
            output_file.write(data)
237

    
238
        # XXX (data file 0x2000 offset)
239
        data = read_memory_range(port, 0x1000, 0xFC0, 0x40)
240
        with open('region_1000.bin', 'wb') as output_file:
241
            output_file.write(data)
242

    
243
        # XXX
244
        data = read_memory_range(port, 0xF000, 0xFC0, 0x40)
245
        with open('region_F000.bin', 'wb') as output_file:
246
            output_file.write(data)
247

    
248
        # XXX
249
        data = read_memory_range(port, 0x3000, 0xFC0, 0x40)
250
        with open('region_3000.bin', 'wb') as output_file:
251
            output_file.write(data)
252

    
253
        # XXX
254
        data = read_memory_range(port, 0xB000, 0xFC0, 0x40)
255
        with open('region_B000.bin', 'wb') as output_file:
256
            output_file.write(data)
257

    
258
        # XXX
259
        data = read_memory_range(port, 0xD000, 0xFC0, 0x40)
260
        with open('region_D000.bin', 'wb') as output_file:
261
            output_file.write(data)
262

    
263

    
264
def main():
265
    parser = argparse.ArgumentParser()
266
    parser.add_argument(
267
        '-d', '--device',
268
        type=Path,
269
        default='/dev/ttyUSB0')
270

    
271
    args = parser.parse_args()
272

    
273
    # XXX
274
    read(device_path=args.device)
275

    
276

    
277
if __name__ == '__main__':
278
    main()
(4-4/11)