Project

General

Profile

New Model #4129 » thd74.py

Eric Wolak, 08/17/2019 02:45 PM

 
1
import logging
2
import struct
3

    
4
import time
5

    
6
from chirp import directory, bitwise, errors, chirp_common, memmap
7

    
8
import thd72
9
from chirp.util import hexprint
10

    
11
LOG = logging.getLogger(__name__)
12

    
13
# Save files from MCP-D74 have a 256-byte header, and possibly some oddness
14
# TH-D74 memory map
15

    
16
# 0x02000: memory flags, 4 bytes per memory
17
# 0x04000: memories, each 40 bytes long
18
# 0x10000: names, each 16 bytes long, null padded, ascii encoded
19

    
20
# memory channel
21
# 0 1 2 3  4 5     6            7     8     9    a          b c d e   f
22
# [freq ]  ? mode  tmode/duplex rtone ctone dtcs cross_mode [offset]  ?
23

    
24
# frequency is 32-bit unsigned little-endian Hz
25

    
26
# Some of the blocks written by MCP have a length set of less than 0x00/256
27
BLOCK_SIZES = {
28
    0x0003: 0x00B4,
29
    0x0007: 0x0068,
30
}
31

    
32
mem_format = """
33
// TODO: find lockout
34

    
35
#seekto 0x02000;
36
struct {
37
// 4 bytes long
38
  u8   disabled;
39
  u8   unk;
40
  u8   group;
41
  u8   unk2;
42
} flag[1000];
43

    
44
#seekto 0x04000;
45
// TODO: deal with the 16-byte trailers of every block
46
struct {
47
    struct {
48
      ul32 freq;
49
      ul32 offset;
50
      
51
      u8   tuning_step:4,
52
           unk:4;
53
      u8   mode:4,
54
           unk1:4;
55
      u8   tone_mode:4,
56
           duplex:4;
57
      u8   rtone;
58
      
59
      u8   ctone;
60
      u8   dtcs;
61
      u8   cross_mode:4
62
           digital_squelch:4;
63
      char urcall[8];
64
      char rpt1[8];
65
      char rpt2[8];
66
      
67
      u8   digital_squelch_code;
68
      
69
    } mem[6];
70
    
71
    u8 pad[16];
72
} memory[160]; // TODO: correct number of memories
73

    
74
#seekto 0x10000;
75
struct {
76
  char name[16];
77
} channel_name[1000];
78
"""
79

    
80
STEPS = [5.0, 6.25, None, None, 10.0, 12.5, 15.0, 20.0, 25.0, 50.0, 100.0, 9.0]
81
MODES = [
82
    "FM",
83
    "DV",
84
    "AM",
85
    "LSB",
86
    "USB",
87
    "CW",
88
    "NFM",
89
    "DV"
90
]
91

    
92
def hex(data):
93
    data_txt = ""
94
    for idx in range(0, len(data), 16):
95
        bytes = data[idx:idx+16].encode("hex").upper()
96
        for idx in range(0, len(bytes), 2):
97
            data_txt += bytes[idx:idx+2] + " "
98
        data_txt += "\n"
99
    return data_txt.strip()
100

    
101
class SProxy(object):
102
    def __init__(self, delegate):
103
        self.delegate = delegate
104

    
105
    def read(self, len):
106
        r = self.delegate.read(len)
107
        LOG.debug("READ\n" + hex(r))
108
        return r
109

    
110
    def write(self, data):
111
        LOG.debug("WRITE\n" + hex(data))
112
        return self.delegate.write(data)
113

    
114
    @property
115
    def timeout(self):
116
        return self.delegate.timeout
117

    
118
    @timeout.setter
119
    def timeout(self, timeout):
120
        self.delegate.timeout = timeout
121

    
122

    
123

    
124
@directory.register
125
class THD74Radio(thd72.THD72Radio):
126
    MODEL = "TH-D74 (clone mode)"
127
    _memsize = 500480
128
    # I think baud rate might be ignored by USB-Serial stack of the D74's
129
    # on-board FTDI chip, but it doesn't seem to hurt.
130
    BAUD_RATE = 115200
131

    
132
    def __init__(self, pipe):
133
        pipe = SProxy(pipe)
134
        super(THD74Radio, self).__init__(pipe)
135

    
136
    def get_features(self):
137
        rf = super(THD74Radio, self).get_features()
138
        rf.has_tuning_step = True
139
        return rf
140

    
141
    def process_mmap(self):
142
        self._memobj = bitwise.parse(mem_format, self._mmap)
143
        self._dirty_blocks = []
144

    
145
    def sync_in(self):
146
        # self._detect_baud()
147
        self._mmap = self.download()
148
        self.process_mmap()
149

    
150
    def sync_out(self):
151
        if len(self._dirty_blocks):
152
            self.upload(self._dirty_blocks)
153
        else:
154
            self.upload()
155

    
156
    def read_block(self, block, count=256):
157
        cmd = struct.pack(">cHH", "R", block, count%256)
158
        self.pipe.write(cmd)
159

    
160
        r = self.pipe.read(5)
161
        if len(r) != 5:
162
            raise Exception("Did not receive block response")
163

    
164
        cmd, _block, _ = struct.unpack(">cHH", r)
165
        if cmd != "W" or _block != block:
166
            raise Exception("Invalid response: %s %i" % (cmd, _block))
167

    
168
        data = ""
169
        while len(data) < count:
170
            data += self.pipe.read(count - len(data))
171

    
172
        self.pipe.write(chr(0x06))
173
        if self.pipe.read(1) != chr(0x06):
174
            raise Exception("Did not receive post-block ACK!")
175

    
176
        return data
177

    
178
    def write_block(self, block, map, count=256):
179
        c = struct.pack(">cHH", "W", block, count%256)
180
        base = block * 256
181
        data = map[base:base + count]
182
        # It's crucial that these are written together. Otherwise the radio
183
        # will fail to ACK under some conditions.
184
        self.pipe.write(c + data)
185

    
186
        ack = self.pipe.read(1)
187
        return ack == chr(0x06)
188

    
189
    def _unlock(self):
190
        """Voodoo sequence of operations to get the radio to accept our programming."""
191

    
192
        h = self.read_block(0, 6)
193
        c = struct.pack(">cHH", "W", 0, 0x30)
194
        self.pipe.write(c)
195
        # magic unlock sequence
196
        unlock = ("\xFF\x01\xFF\x00\x00\xFF\xFF\xFF\xFF\xFF\x01\xFF\xFF\xFF\x03\x01" +
197
                  "\x00\x00\x00\x00\x02\x00\x30\x30\x30\x00\xFF\xFF\xFF\xFF\xFF\xFF" +
198
                  "\xFF\x00\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF")
199
        self.pipe.write(unlock)
200

    
201
        ack = self.pipe.read(1)
202

    
203
        if ack != chr(0x06):
204
            raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))
205

    
206
        c = struct.pack(">cHH", "W", 0, 0x38C8)
207
        self.pipe.write(c)
208
        # magic unlock sequence
209
        unlock = [0xFF] * 8 + [0] * 160 + [0xFF] * 32
210
        unlock = "".join([chr(x) for x in unlock])
211
        self.pipe.write(unlock)
212

    
213
        time.sleep(0.01)
214
        ack = self.pipe.read(1)
215

    
216
        if ack != chr(0x06):
217
            raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))
218

    
219
    def download(self, raw=False, blocks=None):
220
        if blocks is None:
221
            blocks = range(self._memsize / 256)
222
        else:
223
            blocks = [b for b in blocks if b < self._memsize / 256]
224

    
225
        if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
226
            raise errors.RadioError("Radio didn't go into PROGRAM mode")
227

    
228
        allblocks = range(self._memsize / 256)
229
        self.pipe.baudrate = 57600
230
        try:
231
            self.pipe.setRTS()
232
        except AttributeError:
233
            self.pipe.rts = True
234
        self.pipe.read(1)
235
        data = ""
236
        LOG.debug("reading blocks %d..%d" % (blocks[0], blocks[-1]))
237
        total = len(blocks)
238
        count = 0
239
        for i in allblocks:
240
            if i not in blocks:
241
                data += 256 * '\xff'
242
                continue
243
            data += self.read_block(i)
244
            count += 1
245
            if self.status_fn:
246
                s = chirp_common.Status()
247
                s.msg = "Cloning from radio"
248
                s.max = total
249
                s.cur = count
250
                self.status_fn(s)
251

    
252
        self.pipe.write("E")
253

    
254
        if raw:
255
            return data
256
        return memmap.MemoryMap(data)
257

    
258
    def upload(self, blocks=None):
259
        # MCP-D74 sets DTR, so we should too
260
        try:
261
            self.pipe.setDTR()
262
        except AttributeError:
263
            self.pipe.dtr = True
264

    
265
        if blocks is None:
266
            blocks = range((self._memsize / 256) - 2)
267
        else:
268
            blocks = [b for b in blocks if b < self._memsize / 256]
269

    
270
        if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
271
            raise errors.RadioError("Radio didn't go into PROGRAM mode")
272

    
273
        if self._unlock():
274
            raise errors.RadioError("Unlock failed")
275

    
276
        # This block definitely isn't written conventionally, so we let _unlock
277
        # handle it and skip.
278
        blocks.remove(0)
279

    
280
        # For some reason MCP-D74 skips this block. If we don't, we'll get a NACK
281
        # on the next one.
282
        blocks.remove(1279)
283

    
284
        LOG.debug("writing blocks %d..%d" % (blocks[0], blocks[-1]))
285
        total = len(blocks)
286
        count = 0
287
        for i in blocks:
288
            time.sleep(0.001)
289
            r = self.write_block(i, self._mmap, BLOCK_SIZES.get(i, 256))
290
            count += 1
291
            if not r:
292
                raise errors.RadioError("write of block %i failed" % i)
293
            if self.status_fn:
294
                s = chirp_common.Status()
295
                s.msg = "Cloning to radio"
296
                s.max = total
297
                s.cur = count
298
                self.status_fn(s)
299

    
300
        self.pipe.write("F")
301
        # clear out blocks we uploaded from the dirty blocks list
302
        self._dirty_blocks = [b for b in self._dirty_blocks if b not in blocks]
303

    
304
    def command(self, cmd, response_length, timeout=0.5):
305
        start = time.time()
306

    
307
        LOG.debug("PC->D72: %s" % cmd)
308
        default_timeout = self.pipe.timeout
309
        self.pipe.write(cmd + "\r")
310
        self.pipe.timeout = timeout
311
        try:
312
            data = self.pipe.read(response_length + 1)
313
            LOG.debug("D72->PC: %s" % data.strip())
314
        finally:
315
            self.pipe.timeout = default_timeout
316
        return data.strip()
317

    
318
    def get_raw_memory(self, number):
319
        bank = number // 6
320
        idx = number % 6
321

    
322
        _mem = self._memobj.memory[bank].mem[idx]
323
        return repr(_mem) + \
324
               repr(self._memobj.flag[number])
325

    
326
    def get_id(self):
327
        r = self.command("ID", 9)
328
        if r.startswith("ID "):
329
            return r.split(" ")[1]
330
        else:
331
            raise errors.RadioError("No response to ID command")
332

    
333
    def get_memory(self, number):
334
        if isinstance(number, str):
335
            try:
336
                number = thd72.THD72_SPECIAL[number]
337
            except KeyError:
338
                raise errors.InvalidMemoryLocation("Unknown channel %s" %
339
                                                   number)
340

    
341
        if number < 0 or number > (max(thd72.THD72_SPECIAL.values()) + 1):
342
            raise errors.InvalidMemoryLocation(
343
                "Number must be between 0 and 999")
344

    
345
        bank = number // 6
346
        idx = number % 6
347

    
348
        _mem = self._memobj.memory[bank].mem[idx]
349
        flag = self._memobj.flag[number]
350

    
351
        if MODES[_mem.mode] == "DV":
352
            mem = chirp_common.DVMemory()
353
        else:
354
            mem = chirp_common.Memory()
355

    
356
        mem.number = number
357

    
358
        if number > 999:
359
            mem.extd_number = thd72.THD72_SPECIAL_REV[number]
360
        if flag.disabled == 0xFF:
361
            mem.empty = True
362
            return mem
363

    
364
        mem.name = self.get_channel_name(number)
365
        mem.freq = int(_mem.freq)
366
        mem.tmode = thd72.TMODES[int(_mem.tone_mode)]
367
        mem.rtone = chirp_common.TONES[_mem.rtone]
368
        mem.ctone = chirp_common.TONES[_mem.ctone]
369
        mem.dtcs = chirp_common.DTCS_CODES[_mem.dtcs]
370
        mem.duplex = thd72.DUPLEX[int(_mem.duplex)]
371
        mem.offset = _mem.offset
372
        mem.mode = MODES[int(_mem.mode)]
373
        mem.tuning_step = STEPS[_mem.tuning_step]
374

    
375
        if mem.mode == "DV":
376
            mem.dv_urcall = _mem.urcall
377
            mem.dv_rpt1call = _mem.rpt1
378
            mem.dv_rpt2call = _mem.rpt2
379
            mem.dv_code = _mem.digital_squelch_code
380

    
381
        if number < 999:
382
            # mem.skip = chirp_common.SKIP_VALUES[int(flag.skip)]
383
            mem.cross_mode = chirp_common.CROSS_MODES[_mem.cross_mode]
384
        if number > 999:
385
            mem.cross_mode = chirp_common.CROSS_MODES[0]
386
            mem.immutable = ["number", "bank", "extd_number", "cross_mode"]
387
            if number >= 1020 and number < 1030:
388
                mem.immutable += ["freq", "offset", "tone", "mode",
389
                                  "tmode", "ctone", "skip"]  # FIXME: ALL
390
            else:
391
                mem.immutable += ["name"]
392

    
393
        return mem
394

    
395
    def set_memory(self, mem):
396
        LOG.debug("set_memory(%d)" % mem.number)
397
        if mem.number < 0 or mem.number > (max(thd72.THD72_SPECIAL.values()) + 1):
398
            raise errors.InvalidMemoryLocation(
399
                "Number must be between 0 and 999")
400

    
401
        # weather channels can only change name, nothing else
402
        if mem.number >= 1020 and mem.number < 1030:
403
            self.set_channel_name(mem.number, mem.name)
404
            return
405

    
406
        flag = self._memobj.flag[mem.number]
407
        self.add_dirty_block(self._memobj.flag[mem.number])
408

    
409
        # only delete non-WX channels
410
        was_empty = flag.disabled == 0xf
411
        if mem.empty:
412
            flag.disabled = 0xf
413
            return
414
        flag.disabled = 0
415

    
416
        _mem = self._memobj.memory[mem.number]
417
        self.add_dirty_block(_mem)
418
        if was_empty:
419
            self.initialize(_mem)
420

    
421
        _mem.freq = mem.freq
422

    
423
        if mem.number < 999:
424
            self.set_channel_name(mem.number, mem.name)
425

    
426
        _mem.tone_mode = thd72.TMODES_REV[mem.tmode]
427
        _mem.rtone = chirp_common.TONES.index(mem.rtone)
428
        _mem.ctone = chirp_common.TONES.index(mem.ctone)
429
        _mem.dtcs = chirp_common.DTCS_CODES.index(mem.dtcs)
430
        _mem.cross_mode = chirp_common.CROSS_MODES.index(mem.cross_mode)
431
        _mem.duplex = thd72.DUPLEX_REV[mem.duplex]
432
        _mem.offset = mem.offset
433
        _mem.mode = thd72.MODES_REV[mem.mode]
434

    
435
        prog_vfo = thd72.get_prog_vfo(mem.freq)
436
        flag.prog_vfo = prog_vfo
437
        _mem.unknown1 = _mem.unknown2 = thd72.UNKNOWN_LOOKUP[prog_vfo]
438

    
439
        if mem.number < 999:
440
            flag.skip = chirp_common.SKIP_VALUES.index(mem.skip)
(5-5/10)