1
|
# Test script to play with the Wouxung KG-UV950P
|
2
|
# Based on the notes and work from the the Wouxun KG-UV8D Plus
|
3
|
# Pavel Milanes, CO7WT, pavelmc@gmail.com
|
4
|
|
5
|
|
6
|
# some notes from the chirp's driver
|
7
|
|
8
|
# Figured out how the data is encrypted and implement
|
9
|
# serial data encryption and decryption functions.
|
10
|
# The algorithm of decryption works like this:
|
11
|
# - the first byte of data stream is XOR by const 57h
|
12
|
# - each next byte is encoded by previous byte using the XOR
|
13
|
# including the checksum (e.g data[i - 1] xor data[i])
|
14
|
|
15
|
# Support for the Wouxun KG-UV8D Plus radio
|
16
|
# Serial coms are at 19200 baud
|
17
|
# The data is passed in variable length records
|
18
|
# Record structure:
|
19
|
# Offset Usage
|
20
|
# 0 start of record (\x7a)
|
21
|
# 1 Command (\x80 Identify \x81 End/Reboot \x82 Read \x83 Write)
|
22
|
# 2 direction (\xff PC-> Radio, \x00 Radio -> PC)
|
23
|
# 3 length of payload (excluding header/checksum) (n)
|
24
|
# 4 payload (n bytes)
|
25
|
# 4+n+1 checksum - byte sum (% 256) of bytes 1 -> 4+n
|
26
|
#
|
27
|
# Memory Read Records:
|
28
|
# the payload is 3 bytes, first 2 are offset (big endian),
|
29
|
# 3rd is number of bytes to read
|
30
|
# Memory Write Records:
|
31
|
# the maximum payload size (from the Wouxun software) seems to be 66 bytes
|
32
|
# (2 bytes location + 64 bytes data).
|
33
|
|
34
|
|
35
|
def hexprint(data, addrfmt=None):
|
36
|
"""Return a hexdump-like encoding of @data"""
|
37
|
if addrfmt is None:
|
38
|
addrfmt = '%(addr)03i'
|
39
|
|
40
|
block_size = 16
|
41
|
|
42
|
lines = (len(data) / block_size)
|
43
|
|
44
|
if (len(data) % block_size) != 0:
|
45
|
lines += 1
|
46
|
data += "\x00" * ((lines * block_size) - len(data))
|
47
|
|
48
|
out = ""
|
49
|
|
50
|
for block in range(0, (len(data)/block_size)):
|
51
|
addr = block * block_size
|
52
|
try:
|
53
|
out += addrfmt % locals()
|
54
|
except (OverflowError, ValueError, TypeError, KeyError):
|
55
|
out += "%03i" % addr
|
56
|
out += ': '
|
57
|
|
58
|
left = len(data) - (block * block_size)
|
59
|
if left < block_size:
|
60
|
limit = left
|
61
|
else:
|
62
|
limit = block_size
|
63
|
|
64
|
for j in range(0, limit):
|
65
|
out += "%02x " % ord(data[(block * block_size) + j])
|
66
|
|
67
|
out += " "
|
68
|
|
69
|
for j in range(0, limit):
|
70
|
char = data[(block * block_size) + j]
|
71
|
|
72
|
|
73
|
if ord(char) > 0x20 and ord(char) < 0x7E:
|
74
|
out += "%s" % char
|
75
|
else:
|
76
|
out += "."
|
77
|
|
78
|
out += "\n"
|
79
|
|
80
|
return out
|
81
|
|
82
|
|
83
|
def bcd_encode(val, bigendian=True, width=None):
|
84
|
"""This is really old and shouldn't be used anymore"""
|
85
|
digits = []
|
86
|
while val != 0:
|
87
|
digits.append(val % 10)
|
88
|
val /= 10
|
89
|
|
90
|
result = ""
|
91
|
|
92
|
if len(digits) % 2 != 0:
|
93
|
digits.append(0)
|
94
|
|
95
|
while width and width > len(digits):
|
96
|
digits.append(0)
|
97
|
|
98
|
for i in range(0, len(digits), 2):
|
99
|
newval = struct.pack("B", (digits[i + 1] << 4) | digits[i])
|
100
|
if bigendian:
|
101
|
result = newval + result
|
102
|
else:
|
103
|
result = result + newval
|
104
|
|
105
|
return result
|
106
|
|
107
|
|
108
|
def _checksum(data):
|
109
|
cs = 0
|
110
|
for byte in data:
|
111
|
cs += ord(byte)
|
112
|
return chr(cs % 256)
|
113
|
|
114
|
|
115
|
def decrypt(data):
|
116
|
result = ''
|
117
|
for i in range(len(data)-1, 0, -1):
|
118
|
result += strxor(data[i], data[i - 1])
|
119
|
result += strxor(data[0], '\x57')
|
120
|
return result[::-1]
|
121
|
|
122
|
|
123
|
def encrypt(data):
|
124
|
result = strxor('\x57', data[0])
|
125
|
for i in range(1, len(data), 1):
|
126
|
result += strxor(result[i - 1], data[i])
|
127
|
return result
|
128
|
|
129
|
|
130
|
def strxor (xora, xorb):
|
131
|
return chr(ord(xora) ^ ord(xorb))
|
132
|
|
133
|
|
134
|
def _read_record(data):
|
135
|
"""Return valid, cmd, direction, size, payload """
|
136
|
# read 4 chars for the header
|
137
|
_header = data[0:4]
|
138
|
# get the data len
|
139
|
_length = ord(_header[-1])
|
140
|
|
141
|
# check for data len
|
142
|
if (_length == 0 and _header[1] == "\x80"):
|
143
|
print("Ident record")
|
144
|
return (True, "\x80", 0, 0, 0)
|
145
|
|
146
|
if ((len(data) - 4) < (_length + 1)):
|
147
|
print("Wrong data len in packet")
|
148
|
print("Read data is %d, real data is %d" % (len(data) - 4,_length + 1))
|
149
|
|
150
|
# get the data payload
|
151
|
_payload = data[4:4 + _length]
|
152
|
|
153
|
# get the packet checksum char
|
154
|
_rcs = data[-1]
|
155
|
|
156
|
# decrypt packet
|
157
|
_packet = decrypt(_payload)
|
158
|
|
159
|
# DEBUG
|
160
|
print("Decrypted payload [%i][%i]\n%s" % (_length, len(_packet), hexprint(_packet)))
|
161
|
|
162
|
# calc the expected checksum
|
163
|
_ccs = ord(_checksum(_payload))
|
164
|
|
165
|
# read the checksum and decrypt it
|
166
|
#~ _ccs = ord(strxor(data[-2], _rcs_xor))
|
167
|
#~ _rcs = data[-1]
|
168
|
|
169
|
updown = "=>"
|
170
|
# direction
|
171
|
if (_header[2] == '\x00'):
|
172
|
updown = "<="
|
173
|
|
174
|
if (_ccs == _rcs):
|
175
|
# valid record
|
176
|
return (True, _header[1], updown, _length, _packet)
|
177
|
else:
|
178
|
return (False, 0, 0, 0, 0)
|
179
|
|
180
|
|
181
|
def l2b(data):
|
182
|
"""Line lo bytes string"""
|
183
|
# convert the data in string format to byte format
|
184
|
hexd = data[2:].split(" ")
|
185
|
|
186
|
data = ""
|
187
|
for h in hexd:
|
188
|
if (len(h) != 0):
|
189
|
data += chr(int(h, 16))
|
190
|
|
191
|
#~ print("Raw data [%i]\n%s" % (len(data), hexprint(data)))
|
192
|
|
193
|
return data
|
194
|
|
195
|
|
196
|
def dump(data):
|
197
|
(valid, cmd, direction, size, payload) = _read_record(l2b(data))
|
198
|
|
199
|
# packet validation
|
200
|
if (valid == False):
|
201
|
# invalid packet, finish
|
202
|
print("Invalid packet")
|
203
|
|
204
|
# print relevant info
|
205
|
if (size != 0):
|
206
|
print("%s [%i]:\n%s" % (direction, size, hexprint(payload)))
|
207
|
|
208
|
|
209
|
# run it on the same folder as the data_down_clean.txt file
|
210
|
with open("./data_down_clean.txt") as f:
|
211
|
for line in f:
|
212
|
if (line[0] in 'WR'):
|
213
|
dump(line)
|