|
# Test script to play with the Wouxung KG-UV950P
|
|
# Based on the notes and work from the the Wouxun KG-UV8D Plus
|
|
# Pavel Milanes, CO7WT, pavelmc@gmail.com
|
|
|
|
|
|
# some notes from the chirp's driver
|
|
|
|
# Figured out how the data is encrypted and implement
|
|
# serial data encryption and decryption functions.
|
|
# The algorithm of decryption works like this:
|
|
# - the first byte of data stream is XOR by const 57h
|
|
# - each next byte is encoded by previous byte using the XOR
|
|
# including the checksum (e.g data[i - 1] xor data[i])
|
|
|
|
# Support for the Wouxun KG-UV8D Plus radio
|
|
# Serial coms are at 19200 baud
|
|
# The data is passed in variable length records
|
|
# Record structure:
|
|
# Offset Usage
|
|
# 0 start of record (\x7a)
|
|
# 1 Command (\x80 Identify \x81 End/Reboot \x82 Read \x83 Write)
|
|
# 2 direction (\xff PC-> Radio, \x00 Radio -> PC)
|
|
# 3 length of payload (excluding header/checksum) (n)
|
|
# 4 payload (n bytes)
|
|
# 4+n+1 checksum - byte sum (% 256) of bytes 1 -> 4+n
|
|
#
|
|
# Memory Read Records:
|
|
# the payload is 3 bytes, first 2 are offset (big endian),
|
|
# 3rd is number of bytes to read
|
|
# Memory Write Records:
|
|
# the maximum payload size (from the Wouxun software) seems to be 66 bytes
|
|
# (2 bytes location + 64 bytes data).
|
|
|
|
|
|
def hexprint(data, addrfmt=None):
|
|
"""Return a hexdump-like encoding of @data"""
|
|
if addrfmt is None:
|
|
addrfmt = '%(addr)03i'
|
|
|
|
block_size = 16
|
|
|
|
lines = (len(data) / block_size)
|
|
|
|
if (len(data) % block_size) != 0:
|
|
lines += 1
|
|
data += "\x00" * ((lines * block_size) - len(data))
|
|
|
|
out = ""
|
|
|
|
for block in range(0, (len(data)/block_size)):
|
|
addr = block * block_size
|
|
try:
|
|
out += addrfmt % locals()
|
|
except (OverflowError, ValueError, TypeError, KeyError):
|
|
out += "%03i" % addr
|
|
out += ': '
|
|
|
|
left = len(data) - (block * block_size)
|
|
if left < block_size:
|
|
limit = left
|
|
else:
|
|
limit = block_size
|
|
|
|
for j in range(0, limit):
|
|
out += "%02x " % ord(data[(block * block_size) + j])
|
|
|
|
out += " "
|
|
|
|
for j in range(0, limit):
|
|
char = data[(block * block_size) + j]
|
|
|
|
|
|
if ord(char) > 0x20 and ord(char) < 0x7E:
|
|
out += "%s" % char
|
|
else:
|
|
out += "."
|
|
|
|
out += "\n"
|
|
|
|
return out
|
|
|
|
|
|
def bcd_encode(val, bigendian=True, width=None):
|
|
"""This is really old and shouldn't be used anymore"""
|
|
digits = []
|
|
while val != 0:
|
|
digits.append(val % 10)
|
|
val /= 10
|
|
|
|
result = ""
|
|
|
|
if len(digits) % 2 != 0:
|
|
digits.append(0)
|
|
|
|
while width and width > len(digits):
|
|
digits.append(0)
|
|
|
|
for i in range(0, len(digits), 2):
|
|
newval = struct.pack("B", (digits[i + 1] << 4) | digits[i])
|
|
if bigendian:
|
|
result = newval + result
|
|
else:
|
|
result = result + newval
|
|
|
|
return result
|
|
|
|
|
|
def _checksum(data):
|
|
cs = 0
|
|
for byte in data:
|
|
cs += ord(byte)
|
|
return chr(cs % 256)
|
|
|
|
|
|
def decrypt(data):
|
|
result = ''
|
|
for i in range(len(data)-1, 0, -1):
|
|
result += strxor(data[i], data[i - 1])
|
|
result += strxor(data[0], '\x57')
|
|
return result[::-1]
|
|
|
|
|
|
def encrypt(data):
|
|
result = strxor('\x57', data[0])
|
|
for i in range(1, len(data), 1):
|
|
result += strxor(result[i - 1], data[i])
|
|
return result
|
|
|
|
|
|
def strxor (xora, xorb):
|
|
return chr(ord(xora) ^ ord(xorb))
|
|
|
|
|
|
def _read_record(data):
|
|
"""Return valid, cmd, direction, size, payload """
|
|
# read 4 chars for the header
|
|
_header = data[0:4]
|
|
# get the data len
|
|
_length = ord(_header[-1])
|
|
|
|
# check for data len
|
|
if (_length == 0 and _header[1] == "\x80"):
|
|
print("Ident record")
|
|
return (True, "\x80", 0, 0, 0)
|
|
|
|
if ((len(data) - 4) < (_length + 1)):
|
|
print("Wrong data len in packet")
|
|
print("Read data is %d, real data is %d" % (len(data) - 4,_length + 1))
|
|
|
|
# get the data payload
|
|
_payload = data[4:4 + _length]
|
|
|
|
# get the packet checksum char
|
|
_rcs = data[-1]
|
|
|
|
# decrypt packet
|
|
_packet = decrypt(_payload)
|
|
|
|
# DEBUG
|
|
print("Decrypted payload [%i][%i]\n%s" % (_length, len(_packet), hexprint(_packet)))
|
|
|
|
# calc the expected checksum
|
|
_ccs = ord(_checksum(_payload))
|
|
|
|
# read the checksum and decrypt it
|
|
#~ _ccs = ord(strxor(data[-2], _rcs_xor))
|
|
#~ _rcs = data[-1]
|
|
|
|
updown = "=>"
|
|
# direction
|
|
if (_header[2] == '\x00'):
|
|
updown = "<="
|
|
|
|
if (_ccs == _rcs):
|
|
# valid record
|
|
return (True, _header[1], updown, _length, _packet)
|
|
else:
|
|
return (False, 0, 0, 0, 0)
|
|
|
|
|
|
def l2b(data):
|
|
"""Line lo bytes string"""
|
|
# convert the data in string format to byte format
|
|
hexd = data[2:].split(" ")
|
|
|
|
data = ""
|
|
for h in hexd:
|
|
if (len(h) != 0):
|
|
data += chr(int(h, 16))
|
|
|
|
#~ print("Raw data [%i]\n%s" % (len(data), hexprint(data)))
|
|
|
|
return data
|
|
|
|
|
|
def dump(data):
|
|
(valid, cmd, direction, size, payload) = _read_record(l2b(data))
|
|
|
|
# packet validation
|
|
if (valid == False):
|
|
# invalid packet, finish
|
|
print("Invalid packet")
|
|
|
|
# print relevant info
|
|
if (size != 0):
|
|
print("%s [%i]:\n%s" % (direction, size, hexprint(payload)))
|
|
|
|
|
|
# run it on the same folder as the data_down_clean.txt file
|
|
with open("./data_down_clean.txt") as f:
|
|
for line in f:
|
|
if (line[0] in 'WR'):
|
|
dump(line)
|