test.py

Test python script. - Pavel Milanes, 01/25/2018 06:39 am

Download (5.3 kB)

 
1
# Test script to play with the Wouxung KG-UV950P
2
# Based on the notes and work from the the Wouxun KG-UV8D Plus
3
# Pavel Milanes, CO7WT, pavelmc@gmail.com
4

    
5

    
6
# some notes from the chirp's driver
7

    
8
# Figured out how the data is encrypted and implement
9
# serial data encryption and decryption functions.
10
# The algorithm of decryption works like this:
11
# - the first byte of data stream is XOR by const 57h
12
# - each next byte is encoded by previous byte using the XOR
13
#   including the checksum (e.g data[i - 1] xor data[i])
14

    
15
# Support for the Wouxun KG-UV8D Plus radio
16
# Serial coms are at 19200 baud
17
# The data is passed in variable length records
18
# Record structure:
19
#  Offset   Usage
20
#    0      start of record (\x7a)
21
#    1      Command (\x80 Identify \x81 End/Reboot \x82 Read \x83 Write)
22
#    2      direction (\xff PC-> Radio, \x00 Radio -> PC)
23
#    3      length of payload (excluding header/checksum) (n)
24
#    4      payload (n bytes)
25
#    4+n+1  checksum - byte sum (% 256) of bytes 1 -> 4+n
26
#
27
# Memory Read Records:
28
# the payload is 3 bytes, first 2 are offset (big endian),
29
# 3rd is number of bytes to read
30
# Memory Write Records:
31
# the maximum payload size (from the Wouxun software) seems to be 66 bytes
32
#  (2 bytes location + 64 bytes data).
33

    
34

    
35
def hexprint(data, addrfmt=None):
36
    """Return a hexdump-like encoding of @data"""
37
    if addrfmt is None:
38
        addrfmt = '%(addr)03i'
39

    
40
    block_size = 16
41

    
42
    lines = (len(data) / block_size)
43

    
44
    if (len(data) % block_size) != 0:
45
        lines += 1
46
        data += "\x00" * ((lines * block_size) - len(data))
47

    
48
    out = ""
49

    
50
    for block in range(0, (len(data)/block_size)):
51
        addr = block * block_size
52
        try:
53
            out += addrfmt % locals()
54
        except (OverflowError, ValueError, TypeError, KeyError):
55
            out += "%03i" % addr
56
        out += ': '
57

    
58
        left = len(data) - (block * block_size)
59
        if left < block_size:
60
            limit = left
61
        else:
62
            limit = block_size
63

    
64
        for j in range(0, limit):
65
            out += "%02x " % ord(data[(block * block_size) + j])
66

    
67
        out += "  "
68

    
69
        for j in range(0, limit):
70
            char = data[(block * block_size) + j]
71

    
72

    
73
            if ord(char) > 0x20 and ord(char) < 0x7E:
74
                out += "%s" % char
75
            else:
76
                out += "."
77

    
78
        out += "\n"
79

    
80
    return out
81

    
82

    
83
def bcd_encode(val, bigendian=True, width=None):
84
    """This is really old and shouldn't be used anymore"""
85
    digits = []
86
    while val != 0:
87
        digits.append(val % 10)
88
        val /= 10
89

    
90
    result = ""
91

    
92
    if len(digits) % 2 != 0:
93
        digits.append(0)
94

    
95
    while width and width > len(digits):
96
        digits.append(0)
97

    
98
    for i in range(0, len(digits), 2):
99
        newval = struct.pack("B", (digits[i + 1] << 4) | digits[i])
100
        if bigendian:
101
            result = newval + result
102
        else:
103
            result = result + newval
104

    
105
    return result
106

    
107

    
108
def _checksum(data):
109
    cs = 0
110
    for byte in data:
111
        cs += ord(byte)
112
    return chr(cs % 256)
113

    
114

    
115
def decrypt(data):
116
    result = ''
117
    for i in range(len(data)-1, 0, -1):
118
        result += strxor(data[i], data[i - 1])
119
    result += strxor(data[0], '\x57')
120
    return result[::-1]
121

    
122

    
123
def encrypt(data):
124
    result = strxor('\x57', data[0])
125
    for i in range(1, len(data), 1):
126
        result += strxor(result[i - 1], data[i])
127
    return result
128

    
129

    
130
def strxor (xora, xorb):
131
    return chr(ord(xora) ^ ord(xorb))
132

    
133

    
134
def _read_record(data):
135
    """Return valid, cmd, direction, size, payload """
136
    # read 4 chars for the header
137
    _header = data[0:4]
138
    # get the data len
139
    _length = ord(_header[-1])
140

    
141
    # check for data len
142
    if (_length == 0 and _header[1] == "\x80"):
143
        print("Ident record")
144
        return (True, "\x80", 0, 0, 0)
145

    
146
    if ((len(data) - 4) < (_length + 1)):
147
        print("Wrong data len in packet")
148
        print("Read data is %d, real data is %d" % (len(data) - 4,_length + 1))
149

    
150
    # get the data payload
151
    _payload = data[4:4 + _length]
152

    
153
    # get the packet checksum char
154
    _rcs = data[-1]
155

    
156
    # decrypt packet
157
    _packet = decrypt(_payload)
158

    
159
    # DEBUG
160
    print("Decrypted payload [%i][%i]\n%s" % (_length, len(_packet), hexprint(_packet)))
161

    
162
    # calc the expected checksum
163
    _ccs = ord(_checksum(_payload))
164

    
165
    # read the checksum and decrypt it
166
    #~ _ccs = ord(strxor(data[-2], _rcs_xor))
167
    #~ _rcs = data[-1]
168

    
169
    updown = "=>"
170
    # direction
171
    if (_header[2] == '\x00'):
172
        updown = "<="
173

    
174
    if (_ccs == _rcs):
175
        # valid record
176
        return (True, _header[1], updown, _length, _packet)
177
    else:
178
        return (False, 0, 0, 0, 0)
179

    
180

    
181
def l2b(data):
182
    """Line lo bytes string"""
183
    # convert the data in string format to byte format
184
    hexd = data[2:].split(" ")
185

    
186
    data = ""
187
    for h in hexd:
188
        if (len(h) != 0):
189
            data += chr(int(h, 16))
190

    
191
    #~ print("Raw data [%i]\n%s" % (len(data), hexprint(data)))
192

    
193
    return data
194

    
195

    
196
def dump(data):
197
    (valid, cmd, direction, size, payload) = _read_record(l2b(data))
198

    
199
    # packet validation
200
    if (valid == False):
201
        # invalid packet, finish
202
        print("Invalid packet")
203

    
204
    # print relevant info
205
    if (size != 0):
206
        print("%s [%i]:\n%s" % (direction, size, hexprint(payload)))
207

    
208

    
209
# run it on the same folder as the data_down_clean.txt file
210
with open("./data_down_clean.txt") as f:
211
    for line in f:
212
        if (line[0] in 'WR'):
213
            dump(line)