Project

General

Profile

New Model #4129 » thd74.py

TH-D74 driver - Angus Ainslie, 06/21/2020 09:32 AM

 
import logging
import struct
import binascii

import time

from chirp import directory, bitwise, errors, chirp_common, memmap
from chirp.settings import RadioSettingGroup, RadioSetting, RadioSettings, \
RadioSettingValueInteger, RadioSettingValueString, \
RadioSettingValueList, RadioSettingValueBoolean, \
InvalidValueError

from . import thd72
from chirp.util import hexprint

LOG = logging.getLogger(__name__)

# Save files from MCP-D74 have a 256-byte header, and possibly some oddness
# TH-D74 memory map

# 0x02000: memory flags, 4 bytes per memory
# 0x04000: memories, each 40 bytes long
# 0x10000: names, each 16 bytes long, null padded, ascii encoded

# memory channel
# 0 1 2 3 4 5 6 7 8 9 a b c d e f
# [freq ] ? mode tmode/duplex rtone ctone dtcs cross_mode [offset] ?

# frequency is 32-bit unsigned little-endian Hz

DEFAULT_PROG_VFO = (
(136000000, 174000000),
(410000000, 470000000),
(118000000, 136000000),
(136000000, 174000000),
(320000000, 400000000),
(400000000, 524000000),
)

# Some of the blocks written by MCP have a length set of less than 0x00/256
BLOCK_SIZES = {
0x0003: 0x00B4,
0x0007: 0x0068,
}

mem_format = """
// TODO: find lockout
struct {
u8 variant;
u8 region;
} radio_type;

#seekto 0x390;
struct {
u8 ukn[14];
u8 empty; // no stations 0xFF, staions 0x0
u8 ukn2;
ul32 tuned_station;
} current_fm_radio;

#seekto 0x1040;
struct {
u8 enable; // FM radio on 0x01 off 0x00
u8 seconds;
} fm_radio_settings;

#seekto 0x1088;
struct {
u8 dist_units; // 0 - mile, 1 - KM
u8 rain_units; // 0 - inch, 1 - mm
u8 temp_units; // 0 - F, 1 - C
} unit_settings;

#seekto 0x10c0;
struct {
char power_on_msg[16];
char modem_name[16];
} onmsg_name;

#seekto 0x1100;
struct {
u8 enable; // gps on 0x01 gps off 0x00
} gps_settings;

#seekto 0x1200;
struct {
char callsign[8];
} callsign;

#seekto 0x02000;
struct {
// 4 bytes long
u8 disabled;
u8 unk;
u8 group;
u8 unk2;
} flag[1032];

#seekto 0x04000;
// TODO: deal with the 16-byte trailers of every block
struct {
struct {
ul32 freq;
ul32 offset;
u8 tuning_step:4,
unk:4;
u8 mode:4,
unk1:4;
u8 tone_mode:4,
duplex:4;
u8 rtone;
u8 ctone;
u8 dtcs;
u8 cross_mode:4
digital_squelch:4;
char urcall[8];
char rpt1[8];
char rpt2[8];
u8 digital_squelch_code;
} mem[6];
u8 pad[16];
} memory[172]; // TODO: correct number of memories

#seekto 0x4cd00;
struct {
ul32 freq;
char name[16];
} fm_radio_memory[10];

#seekto 0x10000;
struct {
char name[16];
} channel_name[1000];

#seekto 0x14700;
struct {
char name[16];
} wx_name[10];

#seekto 0x144d0;
struct {
char name[16];
} call_name[6];

#seekto 0x14800;
struct {
char name[16];
} group_name[31];
"""

STEPS = [5.0, 6.25, None, None, 10.0, 12.5, 15.0, 20.0, 25.0, 50.0, 100.0, 9.0]

MODES = [
"FM",
"DV",
"AM",
"LSB",
"USB",
"CW",
"NFM",
"DV"
]

MODES_REV = {
"FM" : 000,
"DV" : 0x1,
"AM" : 0x2,
"LSB": 0x3,
"USB": 0x4,
"CW" : 0x5,
"NFM": 0x6,
"DV" : 0x7
}

DEFAULT_PROG_VFO = (
(136000000, 174000000),
(216000000, 259000000),
(410000000, 470000000),
( 540000, 3500000),
( 3500000, 5100000),
( 51000000, 87000000),
( 87000000, 108000000),
(118000000, 136000000),
(136000000, 174000000),
(216000000, 259000000),
(400000000, 524000000),
(400000000, 524000000),
)

def get_prog_vfo(frequency):
for i, (start, end) in enumerate(DEFAULT_PROG_VFO):
if start <= frequency < end:
return i
raise ValueError("Frequency is out of range.")

def hex(data):
data_txt = ""
for idx in range(0, len(data), 16):
bytes = binascii.hexlify(str(data[idx:idx+16]).encode('utf8')).upper()
for idx in range(0, len(bytes), 2):
data_txt += str(bytes[idx:idx+2]) + " "
data_txt += "\n"
return data_txt.strip()

class SProxy(object):
def __init__(self, delegate):
self.delegate = delegate

def read(self, len):
r = self.delegate.read(len)
LOG.debug("READ\n" + hex(r))
return r

def write(self, data):
LOG.debug("WRITE\n" + hex(data))
return self.delegate.write(str(data))

@property
def timeout(self):
return self.delegate.timeout

@timeout.setter
def timeout(self, timeout):
self.delegate.timeout = timeout



@directory.register
class THD74Radio(thd72.THD72Radio):
MODEL = "TH-D74 (clone mode)"
#MODEL = "TH-D74"
_memsize = 500480
# I think baud rate might be ignored by USB-Serial stack of the D74's
# on-board FTDI chip, but it doesn't seem to hurt.
BAUD_RATE = 115200


#def __init__(self, pipe):
# pipe = SProxy(pipe)
# super(THD74Radio, self).__init__(pipe)

def get_features(self):
rf = super(THD74Radio, self).get_features()
rf.valid_bands = [(118000000, 174000000),
(216000000, 260000000),
(320000000, 524000000)]
rf.valid_modes = list(MODES_REV.keys())
rf.has_tuning_step = True
rf.valid_name_length = 16
return rf

def process_mmap(self):
self._memobj = bitwise.parse(mem_format, self._mmap)
self._dirty_blocks = []

def sync_in(self):
# self._detect_baud()
self._mmap = self.download()
self.process_mmap()

def sync_out(self):
if len(self._dirty_blocks):
self.upload(self._dirty_blocks)
else:
self.upload()

def read_block(self, block, count=256):
cmd = struct.pack(">cHH", b"R", block, count%256)
#print( "Read cmd %s" % cmd )
self.pipe.write(''.join(chr(b) for b in cmd))

r = self.pipe.read(5)
if len(r) != 5:
raise Exception("Did not receive block response")

#print( "Read input %s %i %i %i %i" % ( r, ord(r[1]), ord(r[2]), ord(r[3]), ord(r[4] )))

#cmd, _block, _ = struct.unpack(">cHH", b''.join(ord(b) for b in r))
cmd = r[0]
_block = (ord(r[1]) << 8) + ord(r[2])
if cmd != 'W' or _block != block:
raise Exception("Invalid response: %s %i %i" % (cmd, block, _block))

data = ""
while len(data) < count:
data += self.pipe.read(count - len(data))

self.pipe.write(chr(0x06))
if self.pipe.read(1) != chr(0x06):
raise Exception("Did not receive post-block ACK!")

return data

def write_block(self, block, map, count=256):
#print("Write block ", block )
c = struct.pack(">cHH", b"W", block, count%256)
base = block * 256
data = map[base:base + count]
# It's crucial that these are written together. Otherwise the radio
# will fail to ACK under some conditions.
c_d = ''.join(chr(b) for b in c) + data
self.pipe.write(c_d)

ack = self.pipe.read(1)

if len(ack) == 0:
print("write timed out block %d - trying again" % block )
time.sleep(0.5)
ack = self.pipe.read(1)

if ack != chr(0x06):
print("Block %d write failed %d" % ( block, ord(ack)))

return ack == chr(0x06)

def _unlock(self):
"""Voodoo sequence of operations to get the radio to accept our programming."""

h = self.read_block(0, 6)

unlock = ("\x57\x00\x00\x00\x30\xff\x01\xff\x00\x00\xff\xff\xff\xff\xff\x01" +
"\x00\x00\x00\x03\x01\x00\x00\x00\x00\x02\x00\x30\x30\x30\x00\xff" +
"\xff\xff\xff\xff\xff\xff\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff" +
"\xff\xff\xff\xff\xff")

#"\x57\x00\x00\x00\x30\xff\x01\xff\x00\x00\xff\xff\xff\xff\xff\x01" \
#"\x00\x00\x00\x03\x01\x00\x00\x00\x00\x02\x00\x30\x30\x30\x00\xff" \
#"\xff\xff\xff\xff\xff\xff\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff" \
#"\xff\xff\xff\xff\xff"


self.pipe.write(unlock)

ack = self.pipe.read(1)

if ack != chr(0x06):
raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))

c = struct.pack(">cHH", b"W", 0, 0x38C8)
self.pipe.write(''.join(chr(b) for b in c))
# magic unlock sequence
unlock = [0xFF] * 8 + [0] * 160 + [0xFF] * 32
unlock = "".join([chr(x) for x in unlock])
self.pipe.write(unlock)

time.sleep(0.01)
ack = self.pipe.read(1)

if ack != chr(0x06):
raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))

def download(self, raw=False, blocks=None):
if blocks is None:
blocks = list(range(int(self._memsize / 256)))
else:
blocks = [b for b in blocks if b < int(self._memsize / 256)]

if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
raise errors.RadioError("Radio didn't go into PROGRAM mode")

allblocks = list(range(int(self._memsize / 256)))
self.pipe.baudrate = 57600
try:
self.pipe.setRTS()
except AttributeError:
self.pipe.rts = True
self.pipe.read(1)
data = ""
LOG.debug("reading blocks %d..%d" % (blocks[0], blocks[-1]))
total = len(blocks)
count = 0
for i in allblocks:
if i not in blocks:
data += 256 * '\xff'
continue
data += self.read_block(i)
count += 1
if self.status_fn:
s = chirp_common.Status()
s.msg = "Cloning from radio"
s.max = total
s.cur = count
self.status_fn(s)


self.pipe.write("E")
if raw:
return data
return memmap.MemoryMap(data)

def upload(self, blocks=None):
# MCP-D74 sets DTR, so we should too
try:
self.pipe.setDTR()
except AttributeError:
self.pipe.dtr = True

if blocks is None:
blocks = list(range((int(self._memsize / 256)) - 2))
else:
blocks = [b for b in blocks if b < int(self._memsize / 256)]

if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
raise errors.RadioError("Radio didn't go into PROGRAM mode")

if self._unlock():
raise errors.RadioError("Unlock failed")

# This block definitely isn't written conventionally, so we let _unlock
# handle it and skip.
if 0 in blocks:
blocks.remove(0)

# For some reason MCP-D74 skips this block. If we don't, we'll get a NACK
# on the next one. There is also a more than 500 ms delay for the ACK.
if 1279 in blocks:
blocks.remove(1279)

#print("writing blocks %d..%d" % (blocks[0], blocks[-1]))
total = len(blocks)
count = 0
for i in blocks:
time.sleep(0.001)
r = self.write_block(i, self._mmap, BLOCK_SIZES.get(i, 256))
count += 1
if not r:
raise errors.RadioError("write of block %i failed" % i)
if self.status_fn:
s = chirp_common.Status()
s.msg = "Cloning to radio"
s.max = total
s.cur = count
self.status_fn(s)

#lock = ("\x57\x00\x00\x00\x06\x02\x01\xff\x00\x00\xff")
# unmodified variant 2
radio_type = self._memobj.radio_type
lock = ("\x57\x00\x00\x00\x06%c%c\xff\x00\x00\xff" %
(chr(radio_type.variant), chr(radio_type.region)))

print("Locking radio %s", lock)

self.pipe.write(lock)

self.pipe.write("F")
# clear out blocks we uploaded from the dirty blocks list
self._dirty_blocks = [b for b in self._dirty_blocks if b not in blocks]

def command(self, cmd, response_length, timeout=0.5):
start = time.time()

LOG.debug("PC->D72: %s" % cmd)
default_timeout = self.pipe.timeout
self.pipe.write(cmd + "\r")
self.pipe.timeout = timeout
try:
data = self.pipe.read(response_length + 1)
LOG.debug("D72->PC: %s" % data.strip())
finally:
self.pipe.timeout = default_timeout
return data.strip()

def get_raw_memory(self, number):
bank = number // 6
idx = number % 6

_mem = self._memobj.memory[bank].mem[idx]
return repr(_mem) + \
repr(self._memobj.flag[number])

def get_id(self):
r = self.command("ID", 9)
if r.startswith("ID "):
return r.split(" ")[1]
else:
raise errors.RadioError("No response to ID command")

def set_channel_name(self, number, name):
name = name[:16] + '\x00' * 16
if number < 999:
self._memobj.channel_name[number].name = name[:16]
self.add_dirty_block(self._memobj.channel_name[number])
elif number >= 1020 and number < 1030:
number -= 1020
self._memobj.wx_name[number].name = name[:16]
self.add_dirty_block(self._memobj.wx_name[number])

def get_memory(self, number):
if isinstance(number, str):
try:
number = thd72.THD72_SPECIAL[number]
except KeyError:
raise errors.InvalidMemoryLocation("Unknown channel %s" %
number)

if number < 0 or number > (max(thd72.THD72_SPECIAL.values()) + 1):
raise errors.InvalidMemoryLocation(
"Number must be between 0 and 999")

bank = number // 6
idx = number % 6

#print("reading memory #%d bank %d entry %d" %(number, bank, idx))
_mem = self._memobj.memory[bank].mem[idx]
flag = self._memobj.flag[number]

#print("Memory mode %d" % _mem.mode)
if _mem.mode < len( MODES ) and MODES[_mem.mode] == "DV":
mem = chirp_common.DVMemory()
else:
mem = chirp_common.Memory()

mem.number = number

if number > 999:
mem.extd_number = thd72.THD72_SPECIAL_REV[number]
if flag.disabled == 0xFF:
mem.empty = True
return mem

mem.name = self.get_channel_name(number)
mem.freq = int(_mem.freq)
mem.tmode = thd72.TMODES[int(_mem.tone_mode)]
mem.rtone = chirp_common.TONES[_mem.rtone]
mem.ctone = chirp_common.TONES[_mem.ctone]
mem.dtcs = chirp_common.DTCS_CODES[_mem.dtcs]
mem.duplex = thd72.DUPLEX[int(_mem.duplex)]
mem.offset = _mem.offset
mem.mode = MODES[int(_mem.mode)]
if _mem.tuning_step < len( STEPS ):
mem.tuning_step = STEPS[_mem.tuning_step]
else :
mem.tuning_step = 0xff

if mem.mode == "DV":
mem.dv_urcall = _mem.urcall
mem.dv_rpt1call = _mem.rpt1
mem.dv_rpt2call = _mem.rpt2
mem.dv_code = _mem.digital_squelch_code

if number < 999:
# mem.skip = chirp_common.SKIP_VALUES[int(flag.skip)]
mem.cross_mode = chirp_common.CROSS_MODES[_mem.cross_mode]
if number > 999:
mem.cross_mode = chirp_common.CROSS_MODES[0]
mem.immutable = ["number", "bank", "extd_number", "cross_mode"]
if number >= 1020 and number < 1030:
mem.immutable += ["freq", "offset", "tone", "mode",
"tmode", "ctone", "skip"] # FIXME: ALL
else:
mem.immutable += ["name"]

return mem

def set_memory(self, mem):
LOG.debug("set_memory(%d)" % mem.number)
if mem.number < 0 or mem.number > (max(thd72.THD72_SPECIAL.values()) + 1):
raise errors.InvalidMemoryLocation(
"Number must be between 0 and 999")

# weather channels can only change name, nothing else
if mem.number >= 1020 and mem.number < 1030:
self.set_channel_name(mem.number, mem.name)
return

flag = self._memobj.flag[mem.number]
self.add_dirty_block(self._memobj.flag[mem.number])

# only delete non-WX channels
was_empty = flag.disabled == 0xf
if mem.empty:
flag.disabled = 0xf
return
flag.disabled = 0

bank = mem.number // 6
idx = mem.number % 6

#print("seting memory #%d bank %d entry %d" %(mem.number, bank, idx))
_mem = self._memobj.memory[bank].mem[idx]
self.add_dirty_block(_mem)
if was_empty:
self.initialize(_mem)

_mem.freq = mem.freq

if mem.number < 999:
self.set_channel_name(mem.number, mem.name)

_mem.tone_mode = thd72.TMODES_REV[mem.tmode]
_mem.rtone = chirp_common.TONES.index(mem.rtone)
_mem.ctone = chirp_common.TONES.index(mem.ctone)
_mem.dtcs = chirp_common.DTCS_CODES.index(mem.dtcs)
_mem.cross_mode = chirp_common.CROSS_MODES.index(mem.cross_mode)
_mem.duplex = thd72.DUPLEX_REV[mem.duplex]
_mem.offset = mem.offset
_mem.mode = MODES_REV[mem.mode]

prog_vfo = thd72.get_prog_vfo(mem.freq)
#flag.prog_vfo = prog_vfo

#if mem.number < 999:
# flag.skip = chirp_common.SKIP_VALUES.index(mem.skip)


@staticmethod
def _add_00_pad(val, length):
return val.ljust(length, "\x00")[:length]


@classmethod
def apply_callsign(cls, setting, obj):
callsign = setting.value.get_value().upper()
setattr(obj, "callsign", cls._add_00_pad(callsign, 8))


@classmethod
def apply_power_on_msg(cls, setting, obj):
msg = setting.value.get_value()
setattr(obj, "power_on_msg", cls._add_00_pad(msg, 16))


def _get_general_settings(self):
menu = RadioSettingGroup("general", "General")
cs = self._memobj.callsign

val = RadioSettingValueString(
0, 6, str(cs.callsign).rstrip("\x00"))
rs = RadioSetting("cs.callsign", "Callsign", val)
rs.set_apply_callback(self.apply_callsign, cs)
menu.append(rs)

msg = self._memobj.onmsg_name

val = RadioSettingValueString(
0, 16, str(msg.power_on_msg).rstrip("\x00"))
rs = RadioSetting("msg.power_on_msg", "Power on message", val)
rs.set_apply_callback(self.apply_power_on_msg, msg)
menu.append(rs)

units = self._memobj.unit_settings
val = RadioSettingValueBoolean(units.dist_units)
rs = RadioSetting("unit_settings.dist_units", "distance in KM", val)
menu.append(rs)

val = RadioSettingValueBoolean(units.dist_units)
rs = RadioSetting("unit_settings.rain_units", "rain in mm", val)
menu.append(rs)

val = RadioSettingValueBoolean(units.dist_units)
rs = RadioSetting("unit_settings.temp_units", "degrees in Centigrade", val)
menu.append(rs)

gpss = self._memobj.gps_settings
val = RadioSettingValueBoolean(gpss.enable)
rs = RadioSetting("gps_settings.enable", "enable GPS", val)
menu.append(rs)

return menu

def _get_fm_radio_settings(self):
menu = RadioSettingGroup("fm_radio", "FM radio")

#struct {
# ul32 freq;
# chat name[16];
#} fm_radio_memory[10];
fms = self._memobj.fm_radio_settings

val = RadioSettingValueBoolean(fms.enable)
rs = RadioSetting("fm_radio_settings.enable", "Enable FM radio", val)
menu.append(rs)

val = RadioSettingValueInteger(3, 32, fms.seconds)
rs = RadioSetting("fm_radio_settings.seconds", "FM radio timeout", val)
menu.append(rs)

return menu


def _get_settings(self):
top = RadioSettings(self._get_general_settings(),
self._get_fm_radio_settings())
return top


def get_settings(self):
try:
return self._get_settings()
except:
import traceback
LOG.error("Failed to parse settings: %s", traceback.format_exc())
return None

(9-9/10)