|
#! /usr/bin/python
|
|
|
|
# Test script to play with the Wouxun KG-UV9D-Plus
|
|
# Based on the notes and work from the the Wouxun KG-UV8D Plus
|
|
# Pavel Milanes, CO7WT, pavelmc@gmail.com
|
|
|
|
|
|
# some notes from the chirp's driver
|
|
|
|
# Figured out how the data is encrypted and implement
|
|
# serial data encryption and decryption functions.
|
|
# The algorithm of decryption works like this:
|
|
# - the first byte of data stream is XOR by const 57h
|
|
# - each next byte is encoded by previous byte using the XOR
|
|
# including the checksum (e.g data[i - 1] xor data[i])
|
|
|
|
# Support for the Wouxun KG-UV8D Plus radio
|
|
# Serial coms are at 19200 baud
|
|
# The data is passed in variable length records
|
|
# Record structure:
|
|
# Offset Usage
|
|
# 0 start of record (\x7a)
|
|
# 1 Command (\x80 Identify \x81 End/Reboot \x82 Read \x83 Write)
|
|
# 2 direction (\xff PC-> Radio, \x00 Radio -> PC)
|
|
# 3 length of payload (excluding header/checksum) (n)
|
|
# 4 payload (n bytes)
|
|
# 4+n+1 checksum - byte sum (% 256) of bytes 1 -> 4+n
|
|
#
|
|
# Memory Read Records:
|
|
# the payload is 3 bytes, first 2 are offset (big endian),
|
|
# 3rd is number of bytes to read
|
|
# Memory Write Records:
|
|
# the maximum payload size (from the Wouxun software) seems to be 66 bytes
|
|
# (2 bytes location + 64 bytes data).
|
|
|
|
import sys
|
|
import struct
|
|
import string
|
|
|
|
cmd_name = {
|
|
0x80 : "ident",
|
|
0x81 : "hangup",
|
|
0x82 : "read config",
|
|
0x83 : "write config",
|
|
0x84 : "read channel memory",
|
|
0x85 : "write channel memory"}
|
|
|
|
# list of known config items. If it isn't here, we don't know it yet
|
|
config_item_type = {
|
|
0x740: "FM chan 1-8",
|
|
0x750: "FM chan 9-16",
|
|
0x760: "FM chan 17-20",
|
|
0x8b0: "Configuration",
|
|
0x8c0: "PTT-ANI",
|
|
0x8d0: "SCC",
|
|
0x8f0: "Display banner",
|
|
0x940: "Scan groups part1",
|
|
0x980: "Scan groups part2",
|
|
0x7400: "CALL-ID 1-8",
|
|
0x7440: "CALL-ID 9-16",
|
|
0x7480: "CALL-ID 17-20, names 0-4",
|
|
0x74c0: "CALL-ID names 5-12",
|
|
0x7500: "CALL-ID names 13-20"
|
|
}
|
|
|
|
# error counters
|
|
cmd_cnt = {}
|
|
rep_cnt = {}
|
|
cmd_cerrs = {}
|
|
rep_cerrs = {}
|
|
cmd_lerrs = {}
|
|
rep_lerrs = {}
|
|
|
|
def hexprint(data, addrfmt=None):
|
|
"""Return a hexdump-like encoding of @data"""
|
|
if addrfmt is None:
|
|
addrfmt = '%(addr)03i'
|
|
|
|
block_size = 16
|
|
|
|
lines = (len(data) / block_size)
|
|
if (len(data) % block_size > 0):
|
|
lines += 1
|
|
|
|
out = ""
|
|
left = len(data)
|
|
for block in range(0, lines):
|
|
addr = block * block_size
|
|
try:
|
|
out += addrfmt % locals()
|
|
except (OverflowError, ValueError, TypeError, KeyError):
|
|
out += "%03i" % addr
|
|
out += ': '
|
|
|
|
if left < block_size:
|
|
limit = left
|
|
else:
|
|
limit = block_size
|
|
|
|
for j in range(0, block_size):
|
|
if (j < limit):
|
|
out += "%02x " % data[(block * block_size) + j]
|
|
else:
|
|
out += " "
|
|
|
|
out += " "
|
|
|
|
for j in range(0, block_size):
|
|
|
|
if (j < limit):
|
|
_byte = data[(block * block_size) + j]
|
|
if _byte >= 0x20 and _byte < 0x7F:
|
|
out += "%s" % chr(_byte)
|
|
else:
|
|
out += "."
|
|
else:
|
|
out += " "
|
|
out += "\n"
|
|
if (left > block_size):
|
|
left -= block_size
|
|
|
|
return out
|
|
|
|
|
|
def bcd_encode(val, bigendian=True, width=None):
|
|
"""This is really old and shouldn't be used anymore"""
|
|
digits = []
|
|
while val != 0:
|
|
digits.append(val % 10)
|
|
val /= 10
|
|
|
|
result = ""
|
|
|
|
if len(digits) % 2 != 0:
|
|
digits.append(0)
|
|
|
|
while width and width > len(digits):
|
|
digits.append(0)
|
|
|
|
for i in range(0, len(digits), 2):
|
|
newval = struct.pack("B", (digits[i + 1] << 4) | digits[i])
|
|
if bigendian:
|
|
result = newval + result
|
|
else:
|
|
result = result + newval
|
|
|
|
return result
|
|
|
|
|
|
def pkt_encode(op, payload):
|
|
"""Assemble a packet for the radio and encode it for transmission.
|
|
Yes indeed, the checksum we store is only 4 bits. Why, I suspect it's a bug in
|
|
the radio firmware guys didn't want to fix, i.e. a typo 0xff -> 0xf..."""
|
|
|
|
data = bytearray()
|
|
data.append(0x7d) # tag that marks the beginning of the packet
|
|
data.append(op)
|
|
data.append(0xff) # 0xff is from app to radio
|
|
data.append(len(payload))
|
|
|
|
# calc checksum from op to end
|
|
cksum = op + 0xff + len(payload)
|
|
for byte in payload:
|
|
cksum += byte
|
|
data.append(byte)
|
|
data.append(cksum & 0xf) # Yea, this is a 4 bit cksum (also known as a bug)
|
|
|
|
# now obfuscate by an xor starting with first payload byte ^ 0x52 including the trailing cksum.
|
|
xorbits = 0x52
|
|
for i, byte in enumerate(data[4:]):
|
|
xord = xorbits ^ byte
|
|
data[i + 4] = xord
|
|
xorbits = xord
|
|
return(data)
|
|
|
|
def pkt_decode(data):
|
|
"""Take a packet hot off the wire and decode it into clear text and return the fields.
|
|
We say <<cleartext>> here because all it turns out to be is annoying obfuscation.
|
|
This is the inverse of pkt_decode"""
|
|
|
|
errs = ""
|
|
|
|
# we don't care about data[0]. It is always 0x7d sl leave it alone
|
|
op = data[1]
|
|
direction = data[2]
|
|
bytecount = data[3]
|
|
|
|
# First un-obfuscate the payload and cksum
|
|
payload = bytearray()
|
|
xorbits = 0x52
|
|
for i, byte in enumerate(data[4:]):
|
|
payload.append(xorbits ^ byte)
|
|
xorbits = byte
|
|
|
|
# Calculate the checksum starting with the 3 bytes of the header
|
|
cksum = op + direction + bytecount
|
|
for byte in payload[:-1]:
|
|
cksum += byte
|
|
cksum_match = (cksum & 0xf) == payload[-1] # yes, a 4 bit cksum to match the encode
|
|
if (not cksum_match):
|
|
errs += "Checksum missmatch: %x != %x; " % (cksum, payload[-1])
|
|
return (cksum_match, op, direction, bytecount, payload[:-1], errs)
|
|
|
|
# display helpers
|
|
|
|
def short2freq(num):
|
|
""" Convert a signed 16 bit integer into a frequency string expressed in MHz
|
|
This is stored in the radio as units of 10KHz which we compensate to Hz.
|
|
A value of -1 indicates <no freqency>, i.e. unused channel.
|
|
"""
|
|
|
|
if (num > 0):
|
|
return "%2d.%01d" % (num/10, num % 10)
|
|
else:
|
|
return "----"
|
|
|
|
def int2freq(num):
|
|
""" Convert a signed 32 bit integer into a frequency string expressed in MHz
|
|
This is stored in the radio as units of 10Hz which we compensate to Hz.
|
|
A value of -1 indicates <no freqency>, i.e. unused channel.
|
|
"""
|
|
|
|
if (num > 0):
|
|
return "%3d.%05d" % (num/100000, num % 100000)
|
|
else:
|
|
return "--none---"
|
|
|
|
|
|
def callid2str(blk):
|
|
"""Caller ID per MDC-1200 spec? Must be 3-6 digits (100 - 999999).
|
|
Note that the supplied software has a bug that allows '#' and an LF (0x0a)
|
|
to be inserted. We do not check this (yet)
|
|
"""
|
|
|
|
bin2ascii = string.maketrans("\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09",
|
|
"0123456789")
|
|
cid = struct.unpack(">6s2x", blk)
|
|
return cid[0].translate(bin2ascii)
|
|
|
|
def str2callid(s):
|
|
""" See callid2str. This must edit check for numbers and range
|
|
"""
|
|
blk = bytearray()
|
|
for c in s:
|
|
blk.append(ord(c))
|
|
blk.append(0x00)
|
|
blk.append(0x00)
|
|
return blk
|
|
|
|
def conf_item_name(item):
|
|
""" Return a name for a config item/addres if we know it
|
|
"""
|
|
if (item in config_item_type):
|
|
name = config_item_type[item]
|
|
else:
|
|
name = "0x%x" % item
|
|
return name
|
|
|
|
# config content encoder/decoders
|
|
|
|
# 0x740, 0x750, 0x760
|
|
def display_fm_chans(addr, blk):
|
|
""" Display FM band channels. These are stored as 16 bit signed integers in units
|
|
of 10KHz. They seem to be read in 32 byte chunks and written in 8 byte chunks.
|
|
This is a simple structure.
|
|
|
|
struct fm_chan {
|
|
int16_t rx;
|
|
} chan[n]; /* where n is a multiple of 8 in range 8-48 although the read returns 40
|
|
"""
|
|
chan = ((addr - 0x740)>>1) + 1
|
|
while (len(blk) > 0):
|
|
f = struct.unpack('>h', blk[0:2])
|
|
print("chan %2d: %s" % (chan, short2freq(f[0])))
|
|
blk = blk[2:]
|
|
chan += 1
|
|
|
|
# 0x8b0
|
|
|
|
def display_radio_conf(addr, blk):
|
|
"""General radio configuration controls
|
|
"""
|
|
cnames = {
|
|
0: "c0",
|
|
1: "c1",
|
|
2: "c2",
|
|
3: "c3",
|
|
4: "c4",
|
|
5: "c5",
|
|
6: "c6",
|
|
7: "c7",
|
|
8: "c8",
|
|
9: "c9",
|
|
10: "c10",
|
|
11: "Power on display",
|
|
12: "c12",
|
|
13: "c13",
|
|
14: "c14",
|
|
15: "c15"
|
|
}
|
|
|
|
for i, name in enumerate(blk):
|
|
if (i == 11):
|
|
if (blk[i] == 0x00):
|
|
val = "BATTERY"
|
|
else:
|
|
val = "BITMAP"
|
|
else:
|
|
val = "0x%02x" % blk[i]
|
|
print("%s: %s" % (cnames[i], val))
|
|
|
|
# 0x8c0
|
|
|
|
def display_ptt_id(addr, blk):
|
|
"""Radio sends its caller id on transmission
|
|
"""
|
|
cid = callid2str(blk[0:8])
|
|
bits = blk[8:]
|
|
print("My call ID: %s, bits %s" % (cid, hexprint(bits)))
|
|
|
|
# 0x8d0
|
|
|
|
def display_scc_control(addr, blk):
|
|
"""This is the stun/kill/revive code and control?
|
|
This is not definitive yet. It doesn't brick the radio but still...
|
|
"""
|
|
scc = callid2str(blk[0:8])
|
|
bits = blk[8:]
|
|
print("My SCC code: %s, bits %s" % (scc, hexprint(bits)))
|
|
|
|
|
|
# 0x940, 0x980
|
|
group_chan = {}
|
|
group_names = {}
|
|
|
|
def display_scan_group(item, blk):
|
|
""" Display the scan group list. It spans two content blocks
|
|
block 0x940 is the first part followed by block 0x980 which
|
|
contains the remaining names. In real life, these config read/write
|
|
could come in any order. for here, we assume this order.
|
|
|
|
struct scan_940 {
|
|
struct {
|
|
int16_t start_chan;
|
|
int16_t end_chan;
|
|
} groups[10];
|
|
unsigned char[8] padding;
|
|
struct {
|
|
char name[8];
|
|
} names[2];
|
|
}
|
|
|
|
struct scan_980 {
|
|
struct {
|
|
char name[8];
|
|
} names[8]
|
|
}
|
|
"""
|
|
global group_chan
|
|
global group_names
|
|
|
|
if (item == 0x940):
|
|
# first pick up the start end pairs for each group
|
|
group_chan = {}
|
|
group_names = {} # first block so clear anything old/stale
|
|
for i in range(0,10):
|
|
rec = blk[0:4]
|
|
(start, end) = struct.unpack('>hh', rec)
|
|
group_chan[i] = (start, end)
|
|
blk = blk[4:]
|
|
blk = blk[8:] # align over padding
|
|
|
|
# Followed by the first two group names
|
|
for i in range(0,2):
|
|
rec = blk[0:8]
|
|
name = struct.unpack('8s', rec)
|
|
group_names[i] = name[0].replace('\0', ' ') # make trailing nulls into spaces for pretty
|
|
blk = blk[8:]
|
|
|
|
# Put it together for the first two groups.
|
|
print("Address: 0x%x" % item)
|
|
for i in range(0,2):
|
|
start, end = group_chan[i]
|
|
print("group %2d '%s': channel %i to %i" % (i+1, group_names[i], start, end))
|
|
else:
|
|
# We unpack and display the remaining groups assuming 0x940 came first...
|
|
for i in range(2,10):
|
|
rec = blk[0:8]
|
|
name = struct.unpack('8s', rec)
|
|
group_names[i] = name[0].replace('\0', ' ') # make trailing nulls into spaces for pretty
|
|
blk = blk[8:]
|
|
|
|
# Put it together for the first two groups.
|
|
print("Address: 0x%x" % item)
|
|
for i in range(2,10):
|
|
start, end = group_chan[i]
|
|
print("group %2d '%s': channel %i to %i" % (i+1, group_names[i], start, end))
|
|
|
|
# 0x7400, 0x7440, 0x7480, 0x74c0, 0x7500
|
|
callid_code = {}
|
|
callid_name = {}
|
|
|
|
def display_callid(addr, blk):
|
|
"""Decode CALL-ID table which spans four 64 byte transfers.
|
|
We assume here that the requests are in address order.
|
|
There are two codes/names per line as 0 and 1 but both names
|
|
and codes are 1-20
|
|
|
|
struct callid_7400_7440 {
|
|
struct {
|
|
unsigned char cid0[6];
|
|
unsigned char pad0[2];
|
|
unsigned char cid1[6];
|
|
unsigned char pad1[2];
|
|
} callids[4];
|
|
}
|
|
|
|
struct callid_7800 {
|
|
struct {
|
|
unsigned char cid0[6];
|
|
unsigned char pad0;
|
|
unsigned char cid1[6];
|
|
unsigned char pad1;
|
|
} callids[2];
|
|
struct {
|
|
char name0[6];
|
|
char pad0[2]
|
|
char name1[6];
|
|
char pad1[2]
|
|
} names[2]
|
|
}
|
|
|
|
struct callid_78c0_7500 {
|
|
struct {
|
|
char name0[6];
|
|
char pad0[2]
|
|
char name1[6];
|
|
char pad1[2]
|
|
} names[2]
|
|
}
|
|
"""
|
|
global callid_code
|
|
global callid_name
|
|
|
|
if (addr == 0x7400): # callid codes 1-8
|
|
callid_code = {}
|
|
callid_name = {}
|
|
for i in range(0,8):
|
|
cid = callid2str(blk[0:8])
|
|
callid_code[i] = cid
|
|
blk = blk[8:]
|
|
|
|
elif (addr == 0x7440): # callid codes 9-15
|
|
for i in range(8,16):
|
|
rec = blk[0:8]
|
|
cid = callid2str(blk[0:8])
|
|
callid_code[i] = cid
|
|
blk = blk[8:]
|
|
|
|
elif (addr == 0x7480): # callid codes 16-20, names 1-4
|
|
for i in range(16,20):
|
|
rec = blk[0:8]
|
|
cid = callid2str(blk[0:8])
|
|
callid_code[i] = cid
|
|
blk = blk[8:]
|
|
for i in range(0,4):
|
|
rec = blk[0:8]
|
|
name = struct.unpack(">6sxx", rec)
|
|
callid_name[i] = name[0]
|
|
blk = blk[8:]
|
|
|
|
elif (addr == 0x74c0): # names 5-13
|
|
for i in range(4,12):
|
|
rec = blk[0:8]
|
|
name = struct.unpack(">6sxx", rec)
|
|
callid_name[i] = name[0]
|
|
blk = blk[8:]
|
|
|
|
else: # 0x7500 names 14-20, print them.
|
|
for i in range(12,20):
|
|
rec = blk[0:8]
|
|
name = struct.unpack(">6sxx", rec)
|
|
callid_name[i] = name[0]
|
|
blk = blk[8:]
|
|
for i in range(0,20):
|
|
name = callid_name[i].replace('\0', ' ')
|
|
code = callid_code[i]
|
|
print("call-id %2d '%s': code '%s'" % (i+1, name, code))
|
|
|
|
# Command/reply type decodes and encodes
|
|
|
|
def display_config(addr, contents):
|
|
"""Decode config addr blobs into something useful/readable
|
|
"""
|
|
|
|
item_name = conf_item_name(addr)
|
|
print("config item %s (0x%x)" % (item_name, addr))
|
|
if (addr == 0x8b0):
|
|
display_radio_conf(addr, contents)
|
|
elif (addr == 0x8c0):
|
|
display_ptt_id(addr, contents)
|
|
elif (addr == 0x8d0):
|
|
display_scc_control(addr, contents)
|
|
elif (addr == 0x8f0): # no menu, Banner 16 bytes, null term text followed by space (0x20)
|
|
print("%s: '%s'" % (item_name, contents.replace('\0', ' '))) # make trailing nulls into spaces for pretty
|
|
elif (addr == 0x740 or addr == 0x750 or addr == 0x760):
|
|
display_fm_chans(addr, contents)
|
|
elif (addr == 0x940 or addr == 0x980):
|
|
display_scan_group(addr, contents)
|
|
elif (addr == 0x7400 or addr == 0x7440 or addr == 0x7480 or addr == 0x74c0 or addr == 0x7500):
|
|
display_callid(addr, contents)
|
|
else:
|
|
print("%s" % hexprint(contents))
|
|
|
|
def display_ident(blk):
|
|
"""The ident block identifies the radio and its capabilities. This block is always 78 bytes
|
|
The rev == '01' is the base radio and '02' seems to be the '-Plus' version
|
|
"""
|
|
|
|
fmt = '>7s2s'
|
|
rec = blk[0:9]
|
|
rest = blk[9:]
|
|
part1 = rest[0:15]
|
|
part2 = rest[15:28]
|
|
part21 = rest[28:33]
|
|
part3 = rest[33:48]
|
|
part4 = rest[48:61]
|
|
part41 = rest[61:]
|
|
(model, rev) = struct.unpack(fmt, rec)
|
|
print("model = '%s', revision = '%s'" % (model, rev))
|
|
print("part1 [%i]\n%s" % (len(part1), hexprint(part1)))
|
|
print("part2 [%i]\n%s" % (len(part2), hexprint(part2)))
|
|
print("part21 [%i]\n%s" % (len(part2), hexprint(part21)))
|
|
print("part3 [%i]\n%s" % (len(part3), hexprint(part3)))
|
|
print("part4 [%i]\n%s" % (len(part3), hexprint(part4)))
|
|
print("part41 [%i]\n%s" % (len(part3), hexprint(part41)))
|
|
|
|
# Channel memory read/write decode/encode
|
|
|
|
def display_chan_blk(address, blk):
|
|
"""The channel memory is composed of 96 byte blocks of the following
|
|
structure. Each block contains four memories. The blocks are sequencially
|
|
addressed to support the 1-999 memories.
|
|
|
|
struct chan_blk {
|
|
struct {
|
|
int32_t rx;
|
|
int32_t tx;
|
|
unsigned char[8];
|
|
} freq[4];
|
|
struct {
|
|
char name[8];
|
|
} name[8];
|
|
}
|
|
|
|
The radio deletes a channel by nulling the name and setting the receive
|
|
frequency to -1. tx freq is unchanged and bits partially change.
|
|
"""
|
|
if (len(blk) != 96):
|
|
print("display_chan_blk: block too small %d" % len(blk))
|
|
return
|
|
|
|
# First pick up the frequency and mode/modulation/??? records
|
|
freqs = {}
|
|
for i in range(0,4):
|
|
rec = blk[0:16]
|
|
(r, t, bitsh, bitsl) = struct.unpack('>llLL', rec)
|
|
freqs[i] = (r, t, bitsh, bitsl)
|
|
blk = blk[16:]
|
|
|
|
# Followed by the names
|
|
names = {}
|
|
for i in range(0,4):
|
|
rec = blk[0:8]
|
|
name = struct.unpack('8s', rec)
|
|
names[i] = name[0].replace('\0', ' ') # make trailing nulls into spaces for pretty
|
|
blk = blk[8:]
|
|
|
|
# Put it all together and print it
|
|
print("Address: 0x%x" % address)
|
|
for i in range(0,4):
|
|
r, t, bitsh, bitsl = freqs[i]
|
|
chan = ((address>>4) - 0xa0) + 1 +i # map memory address and record to radio channel number
|
|
print("rec %d, ch %3d [%s]: rx=%s, tx=%s, bits=%08x-%08x" % (i, chan, names[i], int2freq(r), int2freq(t),
|
|
bitsh, bitsl))
|
|
|
|
# Process a line to/from the radio
|
|
|
|
def _read_record(lineno, data):
|
|
"""Return valid, cmd, direction, size, payload """
|
|
|
|
global checksum_seed
|
|
|
|
errs = ""
|
|
|
|
### 7D 82 00 12 52 12 22 12 22 12 22 12 ED 12 22 12 22 12 22 12 ED 12 12
|
|
|
|
if (len(data) < 4):
|
|
print("Line %d: Error: Packet too short (%d)" % (lineno, len(data)))
|
|
return (False, 0, 0, 0, 0)
|
|
if (data[0] != 0x7D):
|
|
print("Line %d: Error: Packet start is not right. %02x != 0x7D" % (lineno, data[0]))
|
|
return (False, 0, 0, 0, 0)
|
|
|
|
# Decode the packet
|
|
(ckmatch, cmd, xfer_dir, _length, payload, decode_errs) = pkt_decode(data)
|
|
|
|
# Validations
|
|
|
|
# direction
|
|
if (xfer_dir == 0x00):
|
|
direction = "reply"
|
|
to_radio = False
|
|
elif (xfer_dir == 0xFF):
|
|
direction = "command"
|
|
to_radio = True
|
|
else:
|
|
print("Line %d: Unknown direction: %02x" % (lineno, xfer_dir))
|
|
return(False, 0, 0,0,0)
|
|
|
|
# Payload length
|
|
if (_length > 0 and ((len(payload)) != _length)): #
|
|
errs += "Packet length does not match len(payload) = %d, length = %d; " % (len(payload) , _length)
|
|
if (to_radio):
|
|
if (cmd in cmd_lerrs):
|
|
cmd_lerrs[cmd] += 1
|
|
else:
|
|
cmd_lerrs[cmd] = 1
|
|
else:
|
|
if (cmd in rep_lerrs):
|
|
rep_lerrs[cmd] += 1
|
|
else:
|
|
rep_lerrs[cmd] = 1
|
|
|
|
#Checksum match
|
|
if (not ckmatch):
|
|
errs += decode_errs
|
|
if (to_radio):
|
|
if (cmd in cmd_cerrs):
|
|
cmd_cerrs[cmd] += 1
|
|
else:
|
|
cmd_cerrs[cmd] = 1
|
|
else:
|
|
if (cmd in rep_cerrs):
|
|
rep_cerrs[cmd] += 1
|
|
else:
|
|
rep_cerrs[cmd] = 1
|
|
|
|
# Count commands and replys by type
|
|
if (to_radio):
|
|
if (cmd in cmd_cnt):
|
|
cmd_cnt[cmd] += 1
|
|
else:
|
|
cmd_cnt[cmd] = 1
|
|
else:
|
|
if (cmd in rep_cnt):
|
|
rep_cnt[cmd] += 1
|
|
else:
|
|
rep_cnt[cmd] = 1
|
|
|
|
# Decode packets
|
|
print("Line %d: %s %s (%d) %s" % (lineno, cmd_name[cmd], direction, _length, errs))
|
|
if (cmd == 0x80): # Hello, ident yourself
|
|
if (to_radio):
|
|
if(_length != 0):
|
|
print("malformed ident query")
|
|
else: # Hello, ident yourself
|
|
display_ident(payload)
|
|
|
|
elif (cmd == 0x81): # Hang up. We are done. No reply from radio
|
|
if (to_radio):
|
|
if(_length != 0):
|
|
print("malformed ident query")
|
|
else: # should get no hangup reply
|
|
print("Error: radio should reboot, not reply!")
|
|
else: # these commands move data
|
|
address = payload[0]<<8 | payload[1]
|
|
if (cmd == 0x82): # Read config
|
|
if to_radio:
|
|
count = payload[2]
|
|
print("config item %s (0x%x), fetch %d" % (conf_item_name(address), address, count))
|
|
else:
|
|
item_value = payload[2:]
|
|
display_config(address, item_value)
|
|
|
|
elif (cmd == 0x83): # write config
|
|
if to_radio:
|
|
item_value = payload[2:]
|
|
display_config(address, item_value)
|
|
else:
|
|
print("%s: OK" % conf_item_name(address))
|
|
|
|
elif (cmd == 0x84): # Read channel memory
|
|
if to_radio:
|
|
count = payload[2]
|
|
print("EEPROM addr 0x%x, fetch %d" % (address, count))
|
|
else:
|
|
eeprom_contents = payload[2:]
|
|
display_chan_blk(address, eeprom_contents)
|
|
|
|
elif (cmd == 0x85): # Write channel memory
|
|
if to_radio:
|
|
eeprom_contents = payload[2:]
|
|
display_chan_blk(address, eeprom_contents)
|
|
else:
|
|
print("%s: OK" % conf_item_name(address))
|
|
|
|
else:
|
|
print("Unknown %s command: %x\n[%i]\n%s" % (direction, cmd, len(payload), hexprint(payload)))
|
|
|
|
print("\n------------------ end of line ---------------------------")
|
|
# valid record
|
|
return (True, cmd, direction, _length, payload)
|
|
|
|
def l2b(data):
|
|
"""Line lo bytes string safely.
|
|
using chr() introduces unicode code point conversions.
|
|
We use (force) real byte (integer) types because duck typing of strings bite back."""
|
|
# convert the data in string format to unicode safe byte format
|
|
|
|
return bytearray.fromhex(data)
|
|
|
|
|
|
def dump(lineno, data):
|
|
(valid, cmd, direction, size, payload) = _read_record(lineno, l2b(data))
|
|
|
|
global cmd_cnt
|
|
global rep_cnt
|
|
global cmd_cerrs
|
|
global rep_cerrs
|
|
global cmd_lerrs
|
|
global rep_lerrs
|
|
|
|
# packet validation
|
|
if (valid == False):
|
|
# invalid packet, finish
|
|
print("Invalid packet at line %d" % lineno)
|
|
elif (cmd == 0x81): # roll up stats on hangup
|
|
print("At hangup at line %d. command stats\n # cmds cksm length" % lineno)
|
|
for cmd in list(cmd_cnt):
|
|
if (cmd in cmd_cerrs):
|
|
cerrs = cmd_cerrs[cmd]
|
|
else:
|
|
cerrs = 0
|
|
if (cmd in cmd_lerrs):
|
|
lerrs = cmd_lerrs[cmd]
|
|
else:
|
|
lerrs = 0
|
|
print("command %12s: %4d, %4d, %4d" % (cmd_name[cmd], cmd_cnt[cmd], cerrs, lerrs))
|
|
for cmd in list(rep_cnt):
|
|
if (cmd in rep_cerrs):
|
|
cerrs = rep_cerrs[cmd]
|
|
else:
|
|
cerrs = 0
|
|
if (cmd in rep_lerrs):
|
|
lerrs = rep_lerrs[cmd]
|
|
else:
|
|
lerrs = 0
|
|
print("reply %12s: %4d, %4d, %4d" % (cmd_name[cmd], rep_cnt[cmd], cerrs, lerrs))
|
|
cmd_cnt = {}
|
|
rep_cnt = {}
|
|
cmd_cerrs = {}
|
|
rep_cerrs = {}
|
|
cmd_lerrs = {}
|
|
rep_lerrs = {}
|
|
print("========================= end of session ====================================");
|
|
return cmd
|
|
|
|
|
|
# run it on the same folder as the data_down_clean.txt file
|
|
|
|
with open(sys.argv[1]) as f:
|
|
linenum = 1
|
|
for line in f:
|
|
line = line.replace('\n', '') # chomp
|
|
# drop empty lines and lines w/out the magic tag.
|
|
if (len(line) > 0 and line[0] in 'WR' and line[1] == ":"):
|
|
comm = dump(linenum, line[3:])
|
|
linenum = linenum + 1
|
|
|
|
## # dump just a few packets
|
|
## if (linenum > 3):
|
|
## break
|
|
|