Project

General

Profile

New Model #11122 » thd74.py

3fb07326 - Dan Smith, 02/12/2024 07:13 AM

 
1
import itertools
2
import logging
3
import struct
4
import sys
5
import time
6

    
7
from chirp import bitwise
8
from chirp import chirp_common
9
from chirp import directory
10
from chirp import errors
11
from chirp import memmap
12

    
13
LOG = logging.getLogger(__name__)
14

    
15
CALL_CHANS = ['VHF Call (A)',
16
              'VHF Call (D)',
17
              '220M Call (A)',
18
              '220M Call (D)',
19
              'UHF Call (A)',
20
              'UHF Call (D)']
21

    
22
# This is the order of special channels in memory directly after
23
# regular memory #999
24
EXTD_NUMBERS = list(itertools.chain(
25
    ['%s%02i' % (i % 2 and 'Upper' or 'Lower', i // 2) for i in range(100)],
26
    ['Priority'],
27
    ['WX%i' % (i + 1) for i in range(10)],
28
    [None for i in range(20)],  # 20-channel buffer?
29
    [CALL_CHANS[i] for i in range(len(CALL_CHANS))]))
30

    
31
D74_FILE_HEADER = (
32
    b'MCP-D74\xFFV1.03\xFF\xFF\xFF' +
33
    b'TH-D74' + (b'\xFF' * 10) +
34
    b'\x00' + (b'\xFF' * 15) +
35
    b'\xFF' * (5 * 16) +
36
    b'K2' + (b'\xFF' * 14) +
37
    b'\xFF' * (7 * 16))
38
GROUP_NAME_OFFSET = 1152
39

    
40
MEM_FORMAT = """
41
#seekto 0x2000;
42
struct {
43
  u8 used;
44
  u8 unknown1:7,
45
     lockout:1;
46
  u8 group;
47
  u8 unknownFF;
48
} flags[1200];
49

    
50
#seekto 0x4000;
51
struct {
52
  struct {
53
    ul32 freq;
54
    ul32 offset;
55
    u8 tuning_step:4,
56
       split_tuning_step:3,
57
       unknown2:1;
58
    u8 unknown3_0:1,
59
       mode:3,
60
       narrow:1,
61
       fine_mode:1,
62
       fine_step:2;
63
    u8 tone_mode:1,
64
       ctcss_mode:1,
65
       dtcs_mode:1,
66
       cross_mode:1,
67
       unknown4_0:1,
68
       split:1,
69
       duplex:2;
70
    u8 rtone;
71
    u8 unknownctone:2,
72
       ctone:6;
73
    u8 unknowndtcs:1,
74
       dtcs_code:7;
75
    u8 unknown5_1:2,
76
       cross_mode_mode:2,
77
       unknown5_2:2,
78
       dig_squelch:2;
79
    char dv_urcall[8];
80
    char dv_rpt1call[8];
81
    char dv_rpt2call[8];
82
    u8 unknown9:1,
83
       dv_code:7;
84
  } memories[6];
85
  u8 pad[16];
86
} memgroups[210];
87

    
88
#seekto 0x10000;
89
struct {
90
  char name[16];
91
} names[1200];
92
"""
93

    
94

    
95
def decode_call(call):
96
    return ''.join(str(c) for c in call if ord(str(c)) > 0)
97

    
98

    
99
def encode_call(call):
100
    return call[:8].ljust(8, '\x00')
101

    
102

    
103
DUPLEX = ['', '+', '-']
104
TUNE_STEPS = [5.0, 6.25, 8.33, 9.0, 10.0, 12.5, 15.0, 20.0, 25.0, 30.0, 50.0,
105
              100.0]
106
CROSS_MODES = ['DTCS->', 'Tone->DTCS', 'DTCS->Tone', 'Tone->Tone']
107
MODES = ['FM', 'DV', 'AM', 'LSB', 'USB', 'CW', 'NFM',
108
         'DV',  # Actually DR in the radio
109
         ]
110
DSQL_MODES = ['', 'Code', 'Callsign']
111
FINE_STEPS = [20, 100, 500, 1000]
112

    
113

    
114
class KenwoodGroup(chirp_common.NamedBank):
115
    def __init__(self, model, index):
116
        # Default name until we are initialized, then we will report
117
        # the value from memory
118
        super().__init__(model, index, 'GRP-%i' % index)
119

    
120
    def get_name(self):
121
        name = self._model._radio._memobj.names[
122
            GROUP_NAME_OFFSET + self._index].name
123
        return str(name).rstrip()
124

    
125
    def set_name(self, name):
126
        names = self._model._radio._memobj.names
127
        names[GROUP_NAME_OFFSET + self._index].name = str(name)[:16].ljust(16)
128
        super().set_name(name.strip())
129

    
130

    
131
class KenwoodTHD74Bankmodel(chirp_common.BankModel):
132
    channelAlwaysHasBank = True
133

    
134
    def get_num_mappings(self):
135
        return 30
136

    
137
    def get_mappings(self):
138
        groups = []
139
        for i in range(self.get_num_mappings()):
140
            groups.append(KenwoodGroup(self, i))
141
        return groups
142

    
143
    def add_memory_to_mapping(self, memory, bank):
144
        self._radio._memobj.flags[memory.number].group = bank.get_index()
145

    
146
    def remove_memory_from_mapping(self, memory, bank):
147
        self._radio._memobj.flags[memory.number].group = 0
148

    
149
    def get_mapping_memories(self, bank):
150
        features = self._radio.get_features()
151
        memories = []
152
        for i in range(0, features.memory_bounds[1]):
153
            if self._radio._memobj.flags[i].group == bank.get_index():
154
                memories.append(self._radio.get_memory(i))
155
        return memories
156

    
157
    def get_memory_mappings(self, memory):
158
        index = self._radio._memobj.flags[memory.number].group
159
        return [self.get_mappings()[index]]
160

    
161

    
162
def get_used_flag(mem):
163
    if mem.empty:
164
        return 0xFF
165
    if mem.duplex == 'split':
166
        freq = mem.offset
167
    else:
168
        freq = mem.freq
169

    
170
    if freq < chirp_common.to_MHz(150):
171
        return 0x00
172
    elif freq < chirp_common.to_MHz(400):
173
        return 0x01
174
    else:
175
        return 0x02
176

    
177

    
178
@directory.register
179
class THD74Radio(chirp_common.CloneModeRadio,
180
                 chirp_common.IcomDstarSupport):
181
    VENDOR = "Kenwood"
182
    MODEL = "TH-D74 (clone mode)"
183
    NEEDS_COMPAT_SERIAL = False
184
    BAUD_RATE = 9600
185
    HARDWARE_FLOW = sys.platform == "darwin"  # only OS X driver needs hw flow
186
    FORMATS = [directory.register_format('Kenwood MCP-D74', '*.d74')]
187

    
188
    _memsize = 0x7A300
189

    
190
    def read_block(self, block, count=256):
191
        hdr = struct.pack(">cHH", b"R", block, 0)
192
        self.pipe.write(hdr)
193
        r = self.pipe.read(5)
194
        if len(r) != 5:
195
            raise errors.RadioError("Did not receive block response")
196

    
197
        cmd, _block, zero = struct.unpack(">cHH", r)
198
        if cmd != b"W" or _block != block:
199
            raise errors.RadioError("Invalid response: %s %i" % (cmd, _block))
200

    
201
        data = b""
202
        while len(data) < count:
203
            data += self.pipe.read(count - len(data))
204

    
205
        self.pipe.write(b'\x06')
206
        if self.pipe.read(1) != b'\x06':
207
            raise errors.RadioError("Did not receive post-block ACK!")
208

    
209
        return data
210

    
211
    def write_block(self, block, map, size=256):
212
        hdr = struct.pack(">cHH", b"W", block, size < 256 and size or 0)
213
        base = block * size
214
        data = map[base:base + size]
215
        self.pipe.write(hdr + data)
216
        self.pipe.flush()
217

    
218
        for i in range(10):
219
            ack = self.pipe.read(1)
220
            if ack != b'\x06':
221
                LOG.error('Ack for block %i was: %r' % (block, ack))
222
            else:
223
                break
224
        return ack == b'\x06'
225

    
226
    def download(self, raw=False, blocks=None):
227
        if blocks is None:
228
            blocks = range(self._memsize // 256)
229
        else:
230
            blocks = [b for b in blocks if b < self._memsize // 256]
231

    
232
        if self.command("0M PROGRAM") != "0M":
233
            raise errors.RadioError("No response from self")
234

    
235
        allblocks = range(self._memsize // 256)
236
        self.pipe.baudrate = 57600
237
        self.pipe.read(1)
238
        data = b""
239
        LOG.debug("reading blocks %d..%d" % (blocks[0], blocks[-1]))
240
        total = len(blocks)
241
        count = 0
242
        for i in allblocks:
243
            if i not in blocks:
244
                data += 256 * b'\xff'
245
                continue
246
            data += self.read_block(i)
247
            count += 1
248
            if self.status_fn:
249
                s = chirp_common.Status()
250
                s.msg = "Cloning from radio"
251
                s.max = total
252
                s.cur = count
253
                self.status_fn(s)
254

    
255
        self.pipe.write(b"E")
256

    
257
        if raw:
258
            return data
259
        return memmap.MemoryMapBytes(data)
260

    
261
    def upload(self, blocks=None):
262
        if blocks is None:
263
            blocks = range((self._memsize // 256) - 2)
264
        else:
265
            blocks = [b for b in blocks if b < self._memsize // 256]
266

    
267
        if self.command("0M PROGRAM") != "0M":
268
            raise errors.RadioError("No response from self")
269

    
270
        self.pipe.baudrate = 57600
271
        self.pipe.read(1)
272

    
273
        try:
274
            LOG.debug("writing blocks %d..%d" % (blocks[0], blocks[-1]))
275
            total = len(blocks)
276
            count = 0
277
            for i in blocks:
278
                r = self.write_block(i, self._mmap)
279
                count += 1
280
                if not r:
281
                    raise errors.RadioError("self NAK'd block %i" % i)
282
                if self.status_fn:
283
                    s = chirp_common.Status()
284
                    s.msg = "Cloning to radio"
285
                    s.max = total
286
                    s.cur = count
287
                    self.status_fn(s)
288
        finally:
289
            self.pipe.write(b"E")
290

    
291
    def command(self, cmd, timeout=1):
292
        start = time.time()
293

    
294
        data = b""
295
        LOG.debug("PC->D74: %s" % cmd)
296
        self.pipe.write((cmd + "\r").encode())
297
        while not data.endswith(b"\r") and (time.time() - start) < timeout:
298
            data += self.pipe.read(1)
299
        LOG.debug("D74->PC: %s" % data.strip())
300
        return data.decode().strip()
301

    
302
    def get_id(self):
303
        r = self.command("ID")
304
        if r.startswith("ID "):
305
            return r.split(" ")[1]
306
        else:
307
            raise errors.RadioError("No response to ID command")
308

    
309
    def _detect_baud(self):
310
        id = None
311
        for baud in [9600, 19200, 38400, 57600]:
312
            self.pipe.baudrate = baud
313
            try:
314
                self.pipe.write(b"\r\r")
315
            except Exception:
316
                break
317
            self.pipe.read(32)
318
            try:
319
                id = self.get_id()
320
                LOG.info("Radio %s at %i baud" % (id, baud))
321
                break
322
            except errors.RadioError:
323
                pass
324

    
325
        if id and not self.MODEL.startswith(id):
326
            raise errors.RadioError(_('Unsupported model %r' % id))
327
        elif id:
328
            return id
329
        else:
330
            raise errors.RadioError("No response from radio")
331

    
332
    def process_mmap(self):
333
        self._memobj = bitwise.parse(MEM_FORMAT, self._mmap)
334

    
335
    def sync_in(self):
336
        self._detect_baud()
337
        self._mmap = self.download()
338
        self.process_mmap()
339

    
340
    def sync_out(self):
341
        self._detect_baud()
342
        self.upload()
343

    
344
    def load_mmap(self, filename):
345
        if filename.lower().endswith('.d74'):
346
            with open(filename, 'rb') as f:
347
                f.seek(0x100)
348
                self._mmap = memmap.MemoryMapBytes(f.read())
349
                LOG.info('Loaded MCP d74 file at offset 0x100')
350
            self.process_mmap()
351
        else:
352
            chirp_common.CloneModeRadio.load_mmap(self, filename)
353

    
354
    def save_mmap(self, filename):
355
        if filename.lower().endswith('.d74'):
356
            with open(filename, 'wb') as f:
357
                f.write(D74_FILE_HEADER)
358
                f.write(self._mmap.get_packed())
359
                LOG.info('Wrote MCP d74 file')
360
        else:
361
            chirp_common.CloneModeRadio.save_mmap(self, filename)
362

    
363
    def get_features(self):
364
        rf = chirp_common.RadioFeatures()
365
        rf.valid_tuning_steps = list(TUNE_STEPS)
366
        rf.valid_tmodes = ['', 'Tone', 'TSQL', 'DTCS', 'Cross']
367
        rf.valid_cross_modes = list(CROSS_MODES)
368
        rf.valid_duplexes = DUPLEX + ['split']
369
        rf.valid_skips = ['', 'S']
370
        rf.valid_modes = list(MODES)
371
        rf.valid_characters = chirp_common.CHARSET_ASCII
372
        rf.valid_name_length = 16
373
        rf.valid_bands = [(100000, 470000000)]
374
        rf.valid_special_chans = [x for x in EXTD_NUMBERS if x]
375
        rf.has_cross = True
376
        rf.has_dtcs_polarity = False
377
        rf.has_bank = True
378
        rf.has_bank_names = True
379
        rf.can_odd_split = True
380
        rf.requires_call_lists = False
381
        rf.memory_bounds = (0, 999)
382
        return rf
383

    
384
    def _get_raw_memory(self, number):
385
        # Why Kenwood ... WHY?
386
        return self._memobj.memgroups[number // 6].memories[number % 6]
387

    
388
    def get_memory(self, number):
389
        if isinstance(number, str):
390
            extd_number = number
391
            number = 1000 + EXTD_NUMBERS.index(number)
392
        else:
393
            extd_number = None
394

    
395
        _mem = self._get_raw_memory(number)
396
        _flg = self._memobj.flags[number]
397

    
398
        if MODES[_mem.mode] == 'DV':
399
            mem = chirp_common.DVMemory()
400
        else:
401
            mem = chirp_common.Memory()
402

    
403
        mem.number = number
404
        if extd_number:
405
            mem.extd_number = extd_number
406

    
407
        if _flg.used == 0xFF:
408
            mem.empty = True
409
            return mem
410

    
411
        mem.freq = int(_mem.freq)
412
        if 'Call' in mem.extd_number:
413
            name_index_adj = 5
414
        else:
415
            name_index_adj = 0
416
        _nam = self._memobj.names[number + name_index_adj]
417
        mem.name = str(_nam.name).rstrip().strip('\x00')
418
        mem.offset = int(_mem.offset)
419
        if _mem.split:
420
            mem.duplex = 'split'
421
        else:
422
            mem.duplex = DUPLEX[_mem.duplex]
423
        mem.tuning_step = TUNE_STEPS[_mem.tuning_step]
424
        mem.mode = MODES[_mem.mode]
425
        mem.rtone = chirp_common.TONES[_mem.rtone]
426
        mem.ctone = chirp_common.TONES[_mem.ctone]
427
        mem.dtcs = chirp_common.DTCS_CODES[_mem.dtcs_code]
428

    
429
        if _mem.tone_mode:
430
            mem.tmode = 'Tone'
431
        elif _mem.ctcss_mode:
432
            mem.tmode = 'TSQL'
433
        elif _mem.dtcs_mode:
434
            mem.tmode = 'DTCS'
435
        elif _mem.cross_mode:
436
            mem.tmode = 'Cross'
437
            mem.cross_mode = CROSS_MODES[_mem.cross_mode_mode]
438
        else:
439
            mem.tmode = ''
440

    
441
        mem.skip = _flg.lockout and 'S' or ''
442

    
443
        if mem.mode == 'DV':
444
            mem.dv_urcall = decode_call(_mem.dv_urcall)
445
            mem.dv_rpt1call = decode_call(_mem.dv_rpt1call)
446
            mem.dv_rpt2call = decode_call(_mem.dv_rpt2call)
447
            mem.dv_code = int(_mem.dv_code)
448

    
449
        if mem.extd_number:
450
            mem.immutable.append('empty')
451

    
452
        if 'WX' in mem.extd_number:
453
            mem.tmode = ''
454
            mem.immutable.extend(['rtone', 'ctone', 'dtcs', 'rx_dtcs',
455
                                  'tmode', 'cross_mode', 'dtcs_polarity',
456
                                  'skip', 'power', 'offset', 'mode',
457
                                  'tuning_step'])
458
        if 'Call' in mem.extd_number and mem.mode == 'DV':
459
            mem.immutable.append('mode')
460

    
461
        return mem
462

    
463
    def set_memory(self, mem):
464

    
465
        if mem.number > 999 and 'Call' in EXTD_NUMBERS[mem.number - 1000]:
466
            name_index_adj = 5
467
        else:
468
            name_index_adj = 0
469

    
470
        _mem = self._get_raw_memory(mem.number)
471
        _flg = self._memobj.flags[mem.number]
472
        _nam = self._memobj.names[mem.number + name_index_adj]
473

    
474
        _flg.used = get_used_flag(mem)
475

    
476
        if mem.empty:
477
            _flg.lockout = 0
478
            _flg.group = 0
479
            _nam.name = ('\x00' * 16)
480
            _mem.set_raw(b'\xFF' * 40)
481
            return
482

    
483
        _mem.set_raw(b'\x00' * 40)
484

    
485
        _flg.group = 0  # FIXME
486

    
487
        _mem.freq = mem.freq
488
        _nam.name = mem.name.ljust(16)
489
        _mem.offset = int(mem.offset)
490
        if mem.duplex == 'split':
491
            _mem.split = True
492
            _mem.duplex = 0
493
            _mem.split_tuning_step = TUNE_STEPS.index(
494
                chirp_common.required_step(mem.offset))
495
        else:
496
            _mem.split = False
497
            _mem.duplex = DUPLEX.index(mem.duplex)
498
        _mem.tuning_step = TUNE_STEPS.index(mem.tuning_step)
499
        _mem.mode = MODES.index(mem.mode)
500
        _mem.narrow = mem.mode == 'NFM'
501
        _mem.rtone = chirp_common.TONES.index(mem.rtone)
502
        _mem.ctone = chirp_common.TONES.index(mem.ctone)
503
        _mem.dtcs_code = chirp_common.DTCS_CODES.index(mem.dtcs)
504

    
505
        _mem.tone_mode = mem.tmode == 'Tone'
506
        _mem.ctcss_mode = mem.tmode == 'TSQL'
507
        _mem.dtcs_mode = mem.tmode == 'DTCS'
508
        _mem.cross_mode = mem.tmode == 'Cross'
509

    
510
        if mem.tmode == 'Cross':
511
            _mem.cross_mode_mode = CROSS_MODES.index(mem.cross_mode)
512

    
513
        _flg.lockout = mem.skip == 'S'
514
        if isinstance(mem, chirp_common.DVMemory):
515
            _mem.dv_urcall = encode_call(mem.dv_urcall)
516
            _mem.dv_rpt1call = encode_call(mem.dv_rpt1call)
517
            _mem.dv_rpt2call = encode_call(mem.dv_rpt2call)
518
            _mem.dv_code = mem.dv_code
519

    
520
    def get_raw_memory(self, number):
521
        return (repr(self._get_raw_memory(number)) +
522
                repr(self._memobj.flags[number]))
523

    
524
    def get_bank_model(self):
525
        return KenwoodTHD74Bankmodel(self)
526

    
527
    @classmethod
528
    def match_model(cls, filedata, filename):
529
        if filename.endswith('.d74'):
530
            return True
531
        else:
532
            return chirp_common.CloneModeRadio.match_model(filedata, filename)
533

    
534

    
535
@directory.register
536
class THD75Radio(THD74Radio):
537
    MODEL = 'TH-D75'
(2-2/4)