Project

General

Profile

New Model #3509 » test.py

test program to decode radio transactions - James Lieb, 04/19/2018 01:40 PM

 
#! /usr/bin/python

# Test script to play with the Wouxun KG-UV9D-Plus
# Based on the notes and work from the the Wouxun KG-UV8D Plus
# Pavel Milanes, CO7WT, pavelmc@gmail.com


# some notes from the chirp's driver

# Figured out how the data is encrypted and implement
# serial data encryption and decryption functions.
# The algorithm of decryption works like this:
# - the first byte of data stream is XOR by const 57h
# - each next byte is encoded by previous byte using the XOR
# including the checksum (e.g data[i - 1] xor data[i])

# Support for the Wouxun KG-UV8D Plus radio
# Serial coms are at 19200 baud
# The data is passed in variable length records
# Record structure:
# Offset Usage
# 0 start of record (\x7a)
# 1 Command (\x80 Identify \x81 End/Reboot \x82 Read \x83 Write)
# 2 direction (\xff PC-> Radio, \x00 Radio -> PC)
# 3 length of payload (excluding header/checksum) (n)
# 4 payload (n bytes)
# 4+n+1 checksum - byte sum (% 256) of bytes 1 -> 4+n
#
# Memory Read Records:
# the payload is 3 bytes, first 2 are offset (big endian),
# 3rd is number of bytes to read
# Memory Write Records:
# the maximum payload size (from the Wouxun software) seems to be 66 bytes
# (2 bytes location + 64 bytes data).

import sys
import struct
import string

cmd_name = {
0x80 : "ident",
0x81 : "hangup",
0x82 : "read config",
0x83 : "write config",
0x84 : "read channel memory",
0x85 : "write channel memory"}

# list of known config items. If it isn't here, we don't know it yet
config_item_type = {
0x740: "FM chan 1-8",
0x750: "FM chan 9-16",
0x760: "FM chan 17-20",
0x8b0: "Configuration",
0x8c0: "PTT-ANI",
0x8d0: "SCC",
0x8f0: "Display banner",
0x940: "Scan groups part1",
0x980: "Scan groups part2",
0x7400: "CALL-ID 1-8",
0x7440: "CALL-ID 9-16",
0x7480: "CALL-ID 17-20, names 0-4",
0x74c0: "CALL-ID names 5-12",
0x7500: "CALL-ID names 13-20"
}

# error counters
cmd_cnt = {}
rep_cnt = {}
cmd_cerrs = {}
rep_cerrs = {}
cmd_lerrs = {}
rep_lerrs = {}

def hexprint(data, addrfmt=None):
"""Return a hexdump-like encoding of @data"""
if addrfmt is None:
addrfmt = '%(addr)03i'

block_size = 16

lines = (len(data) / block_size)
if (len(data) % block_size > 0):
lines += 1

out = ""
left = len(data)
for block in range(0, lines):
addr = block * block_size
try:
out += addrfmt % locals()
except (OverflowError, ValueError, TypeError, KeyError):
out += "%03i" % addr
out += ': '

if left < block_size:
limit = left
else:
limit = block_size

for j in range(0, block_size):
if (j < limit):
out += "%02x " % data[(block * block_size) + j]
else:
out += " "

out += " "

for j in range(0, block_size):

if (j < limit):
_byte = data[(block * block_size) + j]
if _byte >= 0x20 and _byte < 0x7F:
out += "%s" % chr(_byte)
else:
out += "."
else:
out += " "
out += "\n"
if (left > block_size):
left -= block_size

return out


def bcd_encode(val, bigendian=True, width=None):
"""This is really old and shouldn't be used anymore"""
digits = []
while val != 0:
digits.append(val % 10)
val /= 10

result = ""

if len(digits) % 2 != 0:
digits.append(0)

while width and width > len(digits):
digits.append(0)

for i in range(0, len(digits), 2):
newval = struct.pack("B", (digits[i + 1] << 4) | digits[i])
if bigendian:
result = newval + result
else:
result = result + newval

return result


def pkt_encode(op, payload):
"""Assemble a packet for the radio and encode it for transmission.
Yes indeed, the checksum we store is only 4 bits. Why, I suspect it's a bug in
the radio firmware guys didn't want to fix, i.e. a typo 0xff -> 0xf..."""

data = bytearray()
data.append(0x7d) # tag that marks the beginning of the packet
data.append(op)
data.append(0xff) # 0xff is from app to radio
data.append(len(payload))

# calc checksum from op to end
cksum = op + 0xff + len(payload)
for byte in payload:
cksum += byte
data.append(byte)
data.append(cksum & 0xf) # Yea, this is a 4 bit cksum (also known as a bug)

# now obfuscate by an xor starting with first payload byte ^ 0x52 including the trailing cksum.
xorbits = 0x52
for i, byte in enumerate(data[4:]):
xord = xorbits ^ byte
data[i + 4] = xord
xorbits = xord
return(data)

def pkt_decode(data):
"""Take a packet hot off the wire and decode it into clear text and return the fields.
We say <<cleartext>> here because all it turns out to be is annoying obfuscation.
This is the inverse of pkt_decode"""

errs = ""
# we don't care about data[0]. It is always 0x7d sl leave it alone
op = data[1]
direction = data[2]
bytecount = data[3]

# First un-obfuscate the payload and cksum
payload = bytearray()
xorbits = 0x52
for i, byte in enumerate(data[4:]):
payload.append(xorbits ^ byte)
xorbits = byte

# Calculate the checksum starting with the 3 bytes of the header
cksum = op + direction + bytecount
for byte in payload[:-1]:
cksum += byte
cksum_match = (cksum & 0xf) == payload[-1] # yes, a 4 bit cksum to match the encode
if (not cksum_match):
errs += "Checksum missmatch: %x != %x; " % (cksum, payload[-1])
return (cksum_match, op, direction, bytecount, payload[:-1], errs)

# display helpers

def short2freq(num):
""" Convert a signed 16 bit integer into a frequency string expressed in MHz
This is stored in the radio as units of 10KHz which we compensate to Hz.
A value of -1 indicates <no freqency>, i.e. unused channel.
"""
if (num > 0):
return "%2d.%01d" % (num/10, num % 10)
else:
return "----"

def int2freq(num):
""" Convert a signed 32 bit integer into a frequency string expressed in MHz
This is stored in the radio as units of 10Hz which we compensate to Hz.
A value of -1 indicates <no freqency>, i.e. unused channel.
"""
if (num > 0):
return "%3d.%05d" % (num/100000, num % 100000)
else:
return "--none---"


def callid2str(blk):
"""Caller ID per MDC-1200 spec? Must be 3-6 digits (100 - 999999).
Note that the supplied software has a bug that allows '#' and an LF (0x0a)
to be inserted. We do not check this (yet)
"""
bin2ascii = string.maketrans("\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09",
"0123456789")
cid = struct.unpack(">6s2x", blk)
return cid[0].translate(bin2ascii)

def str2callid(s):
""" See callid2str. This must edit check for numbers and range
"""
blk = bytearray()
for c in s:
blk.append(ord(c))
blk.append(0x00)
blk.append(0x00)
return blk

def conf_item_name(item):
""" Return a name for a config item/addres if we know it
"""
if (item in config_item_type):
name = config_item_type[item]
else:
name = "0x%x" % item
return name

# config content encoder/decoders

# 0x740, 0x750, 0x760
def display_fm_chans(addr, blk):
""" Display FM band channels. These are stored as 16 bit signed integers in units
of 10KHz. They seem to be read in 32 byte chunks and written in 8 byte chunks.
This is a simple structure.

struct fm_chan {
int16_t rx;
} chan[n]; /* where n is a multiple of 8 in range 8-48 although the read returns 40
"""
chan = ((addr - 0x740)>>1) + 1
while (len(blk) > 0):
f = struct.unpack('>h', blk[0:2])
print("chan %2d: %s" % (chan, short2freq(f[0])))
blk = blk[2:]
chan += 1

# 0x8b0

def display_radio_conf(addr, blk):
"""General radio configuration controls
"""
cnames = {
0: "c0",
1: "c1",
2: "c2",
3: "c3",
4: "c4",
5: "c5",
6: "c6",
7: "c7",
8: "c8",
9: "c9",
10: "c10",
11: "Power on display",
12: "c12",
13: "c13",
14: "c14",
15: "c15"
}
for i, name in enumerate(blk):
if (i == 11):
if (blk[i] == 0x00):
val = "BATTERY"
else:
val = "BITMAP"
else:
val = "0x%02x" % blk[i]
print("%s: %s" % (cnames[i], val))

# 0x8c0

def display_ptt_id(addr, blk):
"""Radio sends its caller id on transmission
"""
cid = callid2str(blk[0:8])
bits = blk[8:]
print("My call ID: %s, bits %s" % (cid, hexprint(bits)))

# 0x8d0

def display_scc_control(addr, blk):
"""This is the stun/kill/revive code and control?
This is not definitive yet. It doesn't brick the radio but still...
"""
scc = callid2str(blk[0:8])
bits = blk[8:]
print("My SCC code: %s, bits %s" % (scc, hexprint(bits)))

# 0x940, 0x980
group_chan = {}
group_names = {}

def display_scan_group(item, blk):
""" Display the scan group list. It spans two content blocks
block 0x940 is the first part followed by block 0x980 which
contains the remaining names. In real life, these config read/write
could come in any order. for here, we assume this order.

struct scan_940 {
struct {
int16_t start_chan;
int16_t end_chan;
} groups[10];
unsigned char[8] padding;
struct {
char name[8];
} names[2];
}

struct scan_980 {
struct {
char name[8];
} names[8]
}
"""
global group_chan
global group_names
if (item == 0x940):
# first pick up the start end pairs for each group
group_chan = {}
group_names = {} # first block so clear anything old/stale
for i in range(0,10):
rec = blk[0:4]
(start, end) = struct.unpack('>hh', rec)
group_chan[i] = (start, end)
blk = blk[4:]
blk = blk[8:] # align over padding
# Followed by the first two group names
for i in range(0,2):
rec = blk[0:8]
name = struct.unpack('8s', rec)
group_names[i] = name[0].replace('\0', ' ') # make trailing nulls into spaces for pretty
blk = blk[8:]

# Put it together for the first two groups.
print("Address: 0x%x" % item)
for i in range(0,2):
start, end = group_chan[i]
print("group %2d '%s': channel %i to %i" % (i+1, group_names[i], start, end))
else:
# We unpack and display the remaining groups assuming 0x940 came first...
for i in range(2,10):
rec = blk[0:8]
name = struct.unpack('8s', rec)
group_names[i] = name[0].replace('\0', ' ') # make trailing nulls into spaces for pretty
blk = blk[8:]

# Put it together for the first two groups.
print("Address: 0x%x" % item)
for i in range(2,10):
start, end = group_chan[i]
print("group %2d '%s': channel %i to %i" % (i+1, group_names[i], start, end))

# 0x7400, 0x7440, 0x7480, 0x74c0, 0x7500
callid_code = {}
callid_name = {}

def display_callid(addr, blk):
"""Decode CALL-ID table which spans four 64 byte transfers.
We assume here that the requests are in address order.
There are two codes/names per line as 0 and 1 but both names
and codes are 1-20

struct callid_7400_7440 {
struct {
unsigned char cid0[6];
unsigned char pad0[2];
unsigned char cid1[6];
unsigned char pad1[2];
} callids[4];
}

struct callid_7800 {
struct {
unsigned char cid0[6];
unsigned char pad0;
unsigned char cid1[6];
unsigned char pad1;
} callids[2];
struct {
char name0[6];
char pad0[2]
char name1[6];
char pad1[2]
} names[2]
}

struct callid_78c0_7500 {
struct {
char name0[6];
char pad0[2]
char name1[6];
char pad1[2]
} names[2]
}
"""
global callid_code
global callid_name

if (addr == 0x7400): # callid codes 1-8
callid_code = {}
callid_name = {}
for i in range(0,8):
cid = callid2str(blk[0:8])
callid_code[i] = cid
blk = blk[8:]
elif (addr == 0x7440): # callid codes 9-15
for i in range(8,16):
rec = blk[0:8]
cid = callid2str(blk[0:8])
callid_code[i] = cid
blk = blk[8:]
elif (addr == 0x7480): # callid codes 16-20, names 1-4
for i in range(16,20):
rec = blk[0:8]
cid = callid2str(blk[0:8])
callid_code[i] = cid
blk = blk[8:]
for i in range(0,4):
rec = blk[0:8]
name = struct.unpack(">6sxx", rec)
callid_name[i] = name[0]
blk = blk[8:]
elif (addr == 0x74c0): # names 5-13
for i in range(4,12):
rec = blk[0:8]
name = struct.unpack(">6sxx", rec)
callid_name[i] = name[0]
blk = blk[8:]
else: # 0x7500 names 14-20, print them.
for i in range(12,20):
rec = blk[0:8]
name = struct.unpack(">6sxx", rec)
callid_name[i] = name[0]
blk = blk[8:]
for i in range(0,20):
name = callid_name[i].replace('\0', ' ')
code = callid_code[i]
print("call-id %2d '%s': code '%s'" % (i+1, name, code))
# Command/reply type decodes and encodes

def display_config(addr, contents):
"""Decode config addr blobs into something useful/readable
"""

item_name = conf_item_name(addr)
print("config item %s (0x%x)" % (item_name, addr))
if (addr == 0x8b0):
display_radio_conf(addr, contents)
elif (addr == 0x8c0):
display_ptt_id(addr, contents)
elif (addr == 0x8d0):
display_scc_control(addr, contents)
elif (addr == 0x8f0): # no menu, Banner 16 bytes, null term text followed by space (0x20)
print("%s: '%s'" % (item_name, contents.replace('\0', ' '))) # make trailing nulls into spaces for pretty
elif (addr == 0x740 or addr == 0x750 or addr == 0x760):
display_fm_chans(addr, contents)
elif (addr == 0x940 or addr == 0x980):
display_scan_group(addr, contents)
elif (addr == 0x7400 or addr == 0x7440 or addr == 0x7480 or addr == 0x74c0 or addr == 0x7500):
display_callid(addr, contents)
else:
print("%s" % hexprint(contents))

def display_ident(blk):
"""The ident block identifies the radio and its capabilities. This block is always 78 bytes
The rev == '01' is the base radio and '02' seems to be the '-Plus' version
"""

fmt = '>7s2s'
rec = blk[0:9]
rest = blk[9:]
part1 = rest[0:15]
part2 = rest[15:28]
part21 = rest[28:33]
part3 = rest[33:48]
part4 = rest[48:61]
part41 = rest[61:]
(model, rev) = struct.unpack(fmt, rec)
print("model = '%s', revision = '%s'" % (model, rev))
print("part1 [%i]\n%s" % (len(part1), hexprint(part1)))
print("part2 [%i]\n%s" % (len(part2), hexprint(part2)))
print("part21 [%i]\n%s" % (len(part2), hexprint(part21)))
print("part3 [%i]\n%s" % (len(part3), hexprint(part3)))
print("part4 [%i]\n%s" % (len(part3), hexprint(part4)))
print("part41 [%i]\n%s" % (len(part3), hexprint(part41)))

# Channel memory read/write decode/encode

def display_chan_blk(address, blk):
"""The channel memory is composed of 96 byte blocks of the following
structure. Each block contains four memories. The blocks are sequencially
addressed to support the 1-999 memories.

struct chan_blk {
struct {
int32_t rx;
int32_t tx;
unsigned char[8];
} freq[4];
struct {
char name[8];
} name[8];
}

The radio deletes a channel by nulling the name and setting the receive
frequency to -1. tx freq is unchanged and bits partially change.
"""
if (len(blk) != 96):
print("display_chan_blk: block too small %d" % len(blk))
return

# First pick up the frequency and mode/modulation/??? records
freqs = {}
for i in range(0,4):
rec = blk[0:16]
(r, t, bitsh, bitsl) = struct.unpack('>llLL', rec)
freqs[i] = (r, t, bitsh, bitsl)
blk = blk[16:]

# Followed by the names
names = {}
for i in range(0,4):
rec = blk[0:8]
name = struct.unpack('8s', rec)
names[i] = name[0].replace('\0', ' ') # make trailing nulls into spaces for pretty
blk = blk[8:]

# Put it all together and print it
print("Address: 0x%x" % address)
for i in range(0,4):
r, t, bitsh, bitsl = freqs[i]
chan = ((address>>4) - 0xa0) + 1 +i # map memory address and record to radio channel number
print("rec %d, ch %3d [%s]: rx=%s, tx=%s, bits=%08x-%08x" % (i, chan, names[i], int2freq(r), int2freq(t),
bitsh, bitsl))
# Process a line to/from the radio

def _read_record(lineno, data):
"""Return valid, cmd, direction, size, payload """

global checksum_seed

errs = ""
### 7D 82 00 12 52 12 22 12 22 12 22 12 ED 12 22 12 22 12 22 12 ED 12 12

if (len(data) < 4):
print("Line %d: Error: Packet too short (%d)" % (lineno, len(data)))
return (False, 0, 0, 0, 0)
if (data[0] != 0x7D):
print("Line %d: Error: Packet start is not right. %02x != 0x7D" % (lineno, data[0]))
return (False, 0, 0, 0, 0)

# Decode the packet
(ckmatch, cmd, xfer_dir, _length, payload, decode_errs) = pkt_decode(data)

# Validations
# direction
if (xfer_dir == 0x00):
direction = "reply"
to_radio = False
elif (xfer_dir == 0xFF):
direction = "command"
to_radio = True
else:
print("Line %d: Unknown direction: %02x" % (lineno, xfer_dir))
return(False, 0, 0,0,0)

# Payload length
if (_length > 0 and ((len(payload)) != _length)): #
errs += "Packet length does not match len(payload) = %d, length = %d; " % (len(payload) , _length)
if (to_radio):
if (cmd in cmd_lerrs):
cmd_lerrs[cmd] += 1
else:
cmd_lerrs[cmd] = 1
else:
if (cmd in rep_lerrs):
rep_lerrs[cmd] += 1
else:
rep_lerrs[cmd] = 1

#Checksum match
if (not ckmatch):
errs += decode_errs
if (to_radio):
if (cmd in cmd_cerrs):
cmd_cerrs[cmd] += 1
else:
cmd_cerrs[cmd] = 1
else:
if (cmd in rep_cerrs):
rep_cerrs[cmd] += 1
else:
rep_cerrs[cmd] = 1
# Count commands and replys by type
if (to_radio):
if (cmd in cmd_cnt):
cmd_cnt[cmd] += 1
else:
cmd_cnt[cmd] = 1
else:
if (cmd in rep_cnt):
rep_cnt[cmd] += 1
else:
rep_cnt[cmd] = 1

# Decode packets
print("Line %d: %s %s (%d) %s" % (lineno, cmd_name[cmd], direction, _length, errs))
if (cmd == 0x80): # Hello, ident yourself
if (to_radio):
if(_length != 0):
print("malformed ident query")
else: # Hello, ident yourself
display_ident(payload)
elif (cmd == 0x81): # Hang up. We are done. No reply from radio
if (to_radio):
if(_length != 0):
print("malformed ident query")
else: # should get no hangup reply
print("Error: radio should reboot, not reply!")
else: # these commands move data
address = payload[0]<<8 | payload[1]
if (cmd == 0x82): # Read config
if to_radio:
count = payload[2]
print("config item %s (0x%x), fetch %d" % (conf_item_name(address), address, count))
else:
item_value = payload[2:]
display_config(address, item_value)

elif (cmd == 0x83): # write config
if to_radio:
item_value = payload[2:]
display_config(address, item_value)
else:
print("%s: OK" % conf_item_name(address))

elif (cmd == 0x84): # Read channel memory
if to_radio:
count = payload[2]
print("EEPROM addr 0x%x, fetch %d" % (address, count))
else:
eeprom_contents = payload[2:]
display_chan_blk(address, eeprom_contents)

elif (cmd == 0x85): # Write channel memory
if to_radio:
eeprom_contents = payload[2:]
display_chan_blk(address, eeprom_contents)
else:
print("%s: OK" % conf_item_name(address))

else:
print("Unknown %s command: %x\n[%i]\n%s" % (direction, cmd, len(payload), hexprint(payload)))
print("\n------------------ end of line ---------------------------")
# valid record
return (True, cmd, direction, _length, payload)

def l2b(data):
"""Line lo bytes string safely.
using chr() introduces unicode code point conversions.
We use (force) real byte (integer) types because duck typing of strings bite back."""
# convert the data in string format to unicode safe byte format

return bytearray.fromhex(data)


def dump(lineno, data):
(valid, cmd, direction, size, payload) = _read_record(lineno, l2b(data))

global cmd_cnt
global rep_cnt
global cmd_cerrs
global rep_cerrs
global cmd_lerrs
global rep_lerrs
# packet validation
if (valid == False):
# invalid packet, finish
print("Invalid packet at line %d" % lineno)
elif (cmd == 0x81): # roll up stats on hangup
print("At hangup at line %d. command stats\n # cmds cksm length" % lineno)
for cmd in list(cmd_cnt):
if (cmd in cmd_cerrs):
cerrs = cmd_cerrs[cmd]
else:
cerrs = 0
if (cmd in cmd_lerrs):
lerrs = cmd_lerrs[cmd]
else:
lerrs = 0
print("command %12s: %4d, %4d, %4d" % (cmd_name[cmd], cmd_cnt[cmd], cerrs, lerrs))
for cmd in list(rep_cnt):
if (cmd in rep_cerrs):
cerrs = rep_cerrs[cmd]
else:
cerrs = 0
if (cmd in rep_lerrs):
lerrs = rep_lerrs[cmd]
else:
lerrs = 0
print("reply %12s: %4d, %4d, %4d" % (cmd_name[cmd], rep_cnt[cmd], cerrs, lerrs))
cmd_cnt = {}
rep_cnt = {}
cmd_cerrs = {}
rep_cerrs = {}
cmd_lerrs = {}
rep_lerrs = {}
print("========================= end of session ====================================");
return cmd


# run it on the same folder as the data_down_clean.txt file

with open(sys.argv[1]) as f:
linenum = 1
for line in f:
line = line.replace('\n', '') # chomp
# drop empty lines and lines w/out the magic tag.
if (len(line) > 0 and line[0] in 'WR' and line[1] == ":"):
comm = dump(linenum, line[3:])
linenum = linenum + 1

## # dump just a few packets
## if (linenum > 3):
## break

(2-2/11)