Project

General

Profile

Bug #10760 ยป thd74.py

db0df41b - Dan Smith, 07/30/2023 11:10 AM

 
1
import itertools
2
import logging
3
import struct
4
import sys
5
import time
6

    
7
from chirp import bitwise
8
from chirp import chirp_common
9
from chirp import directory
10
from chirp import errors
11
from chirp import memmap
12

    
13
LOG = logging.getLogger(__name__)
14

    
15
CALL_CHANS = ['VHF Call (A)',
16
              'VHF Call (D)',
17
              '220M Call (A)',
18
              '220M Call (D)',
19
              'UHF Call (A)',
20
              'UHF Call (D)']
21

    
22
# This is the order of special channels in memory directly after
23
# regular memory #999
24
EXTD_NUMBERS = list(itertools.chain(
25
    ['%s%02i' % (i % 2 and 'Upper' or 'Lower', i // 2) for i in range(100)],
26
    ['Priority'],
27
    ['WX%i' % (i + 1) for i in range(10)],
28
    [None for i in range(20)],  # 20-channel buffer?
29
    [CALL_CHANS[i] for i in range(len(CALL_CHANS))]))
30

    
31
D74_FILE_HEADER = (
32
    b'MCP-D74\xFFV1.03\xFF\xFF\xFF' +
33
    b'TH-D74' + (b'\xFF' * 10) +
34
    b'\x00' + (b'\xFF' * 15) +
35
    b'\xFF' * (5 * 16) +
36
    b'K2' + (b'\xFF' * 14) +
37
    b'\xFF' * (7 * 16))
38
GROUP_NAME_OFFSET = 1152
39

    
40
MEM_FORMAT = """
41
#seekto 0x2000;
42
struct {
43
  u8 used;
44
  u8 unknown1:7,
45
     lockout:1;
46
  u8 group;
47
  u8 unknownFF;
48
} flags[1200];
49

    
50
#seekto 0x4000;
51
struct {
52
  struct {
53
    ul32 freq;
54
    ul32 offset;
55
    u8 tuning_step:4,
56
       split_tuning_step:3,
57
       unknown2:1;
58
    u8 unknown3_0:1,
59
       mode:3,
60
       narrow:1,
61
       fine_mode:1,
62
       fine_step:2;
63
    u8 tone_mode:1,
64
       ctcss_mode:1,
65
       dtcs_mode:1,
66
       cross_mode:1,
67
       unknown4_0:1,
68
       split:1,
69
       duplex:2;
70
    u8 rtone;
71
    u8 unknownctone:2,
72
       ctone:6;
73
    u8 unknowndtcs:1,
74
       dtcs_code:7;
75
    u8 unknown5_1:2,
76
       cross_mode_mode:2,
77
       unknown5_2:2,
78
       dig_squelch:2;
79
    char dv_urcall[8];
80
    char dv_rpt1call[8];
81
    char dv_rpt2call[8];
82
    u8 unknown9:1,
83
       dv_code:7;
84
  } memories[6];
85
  u8 pad[16];
86
} memgroups[210];
87

    
88
#seekto 0x10000;
89
struct {
90
  char name[16];
91
} names[1200];
92
"""
93

    
94

    
95
def decode_call(call):
96
    return ''.join(str(c) for c in call if ord(str(c)) > 0)
97

    
98

    
99
def encode_call(call):
100
    return call[:8].ljust(8, '\x00')
101

    
102

    
103
DUPLEX = ['', '+', '-']
104
TUNE_STEPS = [5.0, 6.25, 8.33, 9.0, 10.0, 12.5, 15.0, 20.0, 25.0, 30.0, 50.0,
105
              100.0]
106
CROSS_MODES = ['DTCS->', 'Tone->DTCS', 'DTCS->Tone', 'Tone->Tone']
107
MODES = ['FM', 'DV', 'AM', 'LSB', 'USB', 'CW', 'NFM',
108
         'DV',  # Actually DR in the radio
109
         ]
110
DSQL_MODES = ['', 'Code', 'Callsign']
111
FINE_STEPS = [20, 100, 500, 1000]
112

    
113

    
114
class KenwoodGroup(chirp_common.NamedBank):
115
    def __init__(self, model, index):
116
        # Default name until we are initialized, then we will report
117
        # the value from memory
118
        super().__init__(model, index, 'GRP-%i' % index)
119

    
120
    def get_name(self):
121
        name = self._model._radio._memobj.names[
122
            GROUP_NAME_OFFSET + self._index].name
123
        return str(name).rstrip()
124

    
125
    def set_name(self, name):
126
        names = self._model._radio._memobj.names
127
        names[GROUP_NAME_OFFSET + self._index].name = str(name)[:16].ljust(16)
128
        super().set_name(name.strip())
129

    
130

    
131
class KenwoodTHD74Bankmodel(chirp_common.BankModel):
132
    channelAlwaysHasBank = True
133

    
134
    def get_num_mappings(self):
135
        return 30
136

    
137
    def get_mappings(self):
138
        groups = []
139
        for i in range(self.get_num_mappings()):
140
            groups.append(KenwoodGroup(self, i))
141
        return groups
142

    
143
    def add_memory_to_mapping(self, memory, bank):
144
        self._radio._memobj.flags[memory.number].group = bank.get_index()
145

    
146
    def remove_memory_from_mapping(self, memory, bank):
147
        self._radio._memobj.flags[memory.number].group = 0
148

    
149
    def get_mapping_memories(self, bank):
150
        features = self._radio.get_features()
151
        memories = []
152
        for i in range(0, features.memory_bounds[1]):
153
            if self._radio._memobj.flags[i].group == bank.get_index():
154
                memories.append(self._radio.get_memory(i))
155
        return memories
156

    
157
    def get_memory_mappings(self, memory):
158
        index = self._radio._memobj.flags[memory.number].group
159
        return [self.get_mappings()[index]]
160

    
161

    
162
def get_used_flag(mem):
163
    if mem.empty:
164
        return 0xFF
165
    if mem.duplex == 'split':
166
        freq = mem.offset
167
    else:
168
        freq = mem.freq
169

    
170
    if freq < chirp_common.to_MHz(150):
171
        return 0x00
172
    elif freq < chirp_common.to_MHz(400):
173
        return 0x01
174
    else:
175
        return 0x02
176

    
177

    
178
@directory.register
179
class THD74Radio(chirp_common.CloneModeRadio,
180
                 chirp_common.IcomDstarSupport):
181
    VENDOR = "Kenwood"
182
    MODEL = "TH-D74 (clone mode)"
183
    NEEDS_COMPAT_SERIAL = False
184
    BAUD_RATE = 9600
185
    HARDWARE_FLOW = sys.platform == "darwin"  # only OS X driver needs hw flow
186
    FORMATS = [directory.register_format('Kenwood MCP-D74', '*.d74')]
187

    
188
    _memsize = 0x7A300
189

    
190
    def read_block(self, block, count=256):
191
        hdr = struct.pack(">cHH", b"R", block, 0)
192
        self.pipe.write(hdr)
193
        r = self.pipe.read(5)
194
        if len(r) != 5:
195
            raise errors.RadioError("Did not receive block response")
196

    
197
        cmd, _block, zero = struct.unpack(">cHH", r)
198
        if cmd != b"W" or _block != block:
199
            raise errors.RadioError("Invalid response: %s %i" % (cmd, _block))
200

    
201
        data = b""
202
        while len(data) < count:
203
            data += self.pipe.read(count - len(data))
204

    
205
        self.pipe.write(b'\x06')
206
        if self.pipe.read(1) != b'\x06':
207
            raise errors.RadioError("Did not receive post-block ACK!")
208

    
209
        return data
210

    
211
    def write_block(self, block, map, size=256):
212
        hdr = struct.pack(">cHH", b"W", block, size < 256 and size or 0)
213
        base = block * size
214
        data = map[base:base + size]
215
        self.pipe.write(hdr + data)
216
        self.pipe.flush()
217

    
218
        for i in range(10):
219
            ack = self.pipe.read(1)
220
            if ack != b'\x06':
221
                LOG.error('Ack for block %i was: %r' % (block, ack))
222
            else:
223
                break
224
        return ack == b'\x06'
225

    
226
    def download(self, raw=False, blocks=None):
227
        if blocks is None:
228
            blocks = range(self._memsize // 256)
229
        else:
230
            blocks = [b for b in blocks if b < self._memsize // 256]
231

    
232
        if self.command("0M PROGRAM") != "0M":
233
            raise errors.RadioError("No response from self")
234

    
235
        allblocks = range(self._memsize // 256)
236
        self.pipe.baudrate = 57600
237
        self.pipe.read(1)
238
        data = b""
239
        LOG.debug("reading blocks %d..%d" % (blocks[0], blocks[-1]))
240
        total = len(blocks)
241
        count = 0
242
        for i in allblocks:
243
            if i not in blocks:
244
                data += 256 * b'\xff'
245
                continue
246
            data += self.read_block(i)
247
            count += 1
248
            if self.status_fn:
249
                s = chirp_common.Status()
250
                s.msg = "Cloning from radio"
251
                s.max = total
252
                s.cur = count
253
                self.status_fn(s)
254

    
255
        self.pipe.write(b"E")
256

    
257
        if raw:
258
            return data
259
        return memmap.MemoryMapBytes(data)
260

    
261
    def upload(self, blocks=None):
262
        if blocks is None:
263
            blocks = range((self._memsize // 256) - 2)
264
        else:
265
            blocks = [b for b in blocks if b < self._memsize // 256]
266

    
267
        if self.command("0M PROGRAM") != "0M":
268
            raise errors.RadioError("No response from self")
269

    
270
        self.pipe.baudrate = 57600
271
        self.pipe.read(1)
272

    
273
        try:
274
            LOG.debug("writing blocks %d..%d" % (blocks[0], blocks[-1]))
275
            total = len(blocks)
276
            count = 0
277
            for i in blocks:
278
                r = self.write_block(i, self._mmap)
279
                count += 1
280
                if not r:
281
                    raise errors.RadioError("self NAK'd block %i" % i)
282
                if self.status_fn:
283
                    s = chirp_common.Status()
284
                    s.msg = "Cloning to radio"
285
                    s.max = total
286
                    s.cur = count
287
                    self.status_fn(s)
288
        finally:
289
            self.pipe.write(b"E")
290

    
291
    def command(self, cmd, timeout=1):
292
        start = time.time()
293

    
294
        data = b""
295
        LOG.debug("PC->D74: %s" % cmd)
296
        self.pipe.write((cmd + "\r").encode())
297
        while not data.endswith(b"\r") and (time.time() - start) < timeout:
298
            data += self.pipe.read(1)
299
        LOG.debug("D74->PC: %s" % data.strip())
300
        return data.decode().strip()
301

    
302
    def get_id(self):
303
        r = self.command("ID")
304
        if r.startswith("ID "):
305
            return r.split(" ")[1]
306
        else:
307
            raise errors.RadioError("No response to ID command")
308

    
309
    def _detect_baud(self):
310
        for baud in [9600, 19200, 38400, 57600]:
311
            self.pipe.baudrate = baud
312
            try:
313
                self.pipe.write(b"\r\r")
314
            except Exception:
315
                break
316
            self.pipe.read(32)
317
            try:
318
                id = self.get_id()
319
                LOG.info("Radio %s at %i baud" % (id, baud))
320
                return True
321
            except errors.RadioError:
322
                pass
323

    
324
        raise errors.RadioError("No response from radio")
325

    
326
    def process_mmap(self):
327
        self._memobj = bitwise.parse(MEM_FORMAT, self._mmap)
328

    
329
    def sync_in(self):
330
        self._detect_baud()
331
        self._mmap = self.download()
332
        self.process_mmap()
333

    
334
    def sync_out(self):
335
        self._detect_baud()
336
        self.upload()
337

    
338
    def load_mmap(self, filename):
339
        if filename.lower().endswith('.d74'):
340
            with open(filename, 'rb') as f:
341
                f.seek(0x100)
342
                self._mmap = memmap.MemoryMapBytes(f.read())
343
                LOG.info('Loaded MCP d74 file at offset 0x100')
344
            self.process_mmap()
345
        else:
346
            chirp_common.CloneModeRadio.load_mmap(self, filename)
347

    
348
    def save_mmap(self, filename):
349
        if filename.lower().endswith('.d74'):
350
            with open(filename, 'wb') as f:
351
                f.write(D74_FILE_HEADER)
352
                f.write(self._mmap.get_packed())
353
                LOG.info('Wrote MCP d74 file')
354
        else:
355
            chirp_common.CloneModeRadio.save_mmap(self, filename)
356

    
357
    def get_features(self):
358
        rf = chirp_common.RadioFeatures()
359
        rf.valid_tuning_steps = list(TUNE_STEPS)
360
        rf.valid_tmodes = ['', 'Tone', 'TSQL', 'DTCS', 'Cross']
361
        rf.valid_cross_modes = list(CROSS_MODES)
362
        rf.valid_duplexes = DUPLEX + ['split']
363
        rf.valid_skips = ['', 'S']
364
        rf.valid_modes = list(MODES)
365
        rf.valid_characters = chirp_common.CHARSET_ASCII
366
        rf.valid_name_length = 16
367
        rf.valid_bands = [(100000, 470000000)]
368
        rf.valid_special_chans = [x for x in EXTD_NUMBERS if x]
369
        rf.has_cross = True
370
        rf.has_dtcs_polarity = False
371
        rf.has_bank = True
372
        rf.has_bank_names = True
373
        rf.can_odd_split = True
374
        rf.requires_call_lists = False
375
        rf.memory_bounds = (0, 999)
376
        return rf
377

    
378
    def _get_raw_memory(self, number):
379
        # Why Kenwood ... WHY?
380
        return self._memobj.memgroups[number // 6].memories[number % 6]
381

    
382
    def get_memory(self, number):
383
        if isinstance(number, str):
384
            extd_number = number
385
            number = 1000 + EXTD_NUMBERS.index(number)
386
        else:
387
            extd_number = None
388

    
389
        _mem = self._get_raw_memory(number)
390
        _flg = self._memobj.flags[number]
391

    
392
        if MODES[_mem.mode] == 'DV':
393
            mem = chirp_common.DVMemory()
394
        else:
395
            mem = chirp_common.Memory()
396

    
397
        mem.number = number
398
        if extd_number:
399
            mem.extd_number = extd_number
400

    
401
        if _flg.used == 0xFF:
402
            mem.empty = True
403
            return mem
404

    
405
        mem.freq = int(_mem.freq)
406
        if 'Call' in mem.extd_number:
407
            name_index_adj = 5
408
        else:
409
            name_index_adj = 0
410
        _nam = self._memobj.names[number + name_index_adj]
411
        mem.name = str(_nam.name).rstrip().strip('\x00')
412
        mem.offset = int(_mem.offset)
413
        if _mem.split:
414
            mem.duplex = 'split'
415
        else:
416
            mem.duplex = DUPLEX[_mem.duplex]
417
        mem.tuning_step = TUNE_STEPS[_mem.tuning_step]
418
        mem.mode = MODES[_mem.mode]
419
        mem.rtone = chirp_common.TONES[_mem.rtone]
420
        mem.ctone = chirp_common.TONES[_mem.ctone]
421
        mem.dtcs = chirp_common.DTCS_CODES[_mem.dtcs_code]
422

    
423
        if _mem.tone_mode:
424
            mem.tmode = 'Tone'
425
        elif _mem.ctcss_mode:
426
            mem.tmode = 'TSQL'
427
        elif _mem.dtcs_mode:
428
            mem.tmode = 'DTCS'
429
        elif _mem.cross_mode:
430
            mem.tmode = 'Cross'
431
            mem.cross_mode = CROSS_MODES[_mem.cross_mode_mode]
432
        else:
433
            mem.tmode = ''
434

    
435
        mem.skip = _flg.lockout and 'S' or ''
436

    
437
        if mem.mode == 'DV':
438
            mem.dv_urcall = decode_call(_mem.dv_urcall)
439
            mem.dv_rpt1call = decode_call(_mem.dv_rpt1call)
440
            mem.dv_rpt2call = decode_call(_mem.dv_rpt2call)
441
            mem.dv_code = int(_mem.dv_code)
442

    
443
        if mem.extd_number:
444
            mem.immutable.append('empty')
445

    
446
        if 'WX' in mem.extd_number:
447
            mem.tmode = ''
448
            mem.immutable.extend(['rtone', 'ctone', 'dtcs', 'rx_dtcs',
449
                                  'tmode', 'cross_mode', 'dtcs_polarity',
450
                                  'skip', 'power', 'offset', 'mode',
451
                                  'tuning_step'])
452
        if 'Call' in mem.extd_number and mem.mode == 'DV':
453
            mem.immutable.append('mode')
454

    
455
        return mem
456

    
457
    def set_memory(self, mem):
458

    
459
        if mem.number > 999 and 'Call' in EXTD_NUMBERS[mem.number - 1000]:
460
            name_index_adj = 5
461
        else:
462
            name_index_adj = 0
463

    
464
        _mem = self._get_raw_memory(mem.number)
465
        _flg = self._memobj.flags[mem.number]
466
        _nam = self._memobj.names[mem.number + name_index_adj]
467

    
468
        _flg.used = get_used_flag(mem)
469

    
470
        if mem.empty:
471
            _flg.lockout = 0
472
            _flg.group = 0
473
            _nam.name = ('\x00' * 16)
474
            _mem.set_raw(b'\xFF' * 40)
475
            return
476

    
477
        _mem.set_raw(b'\x00' * 40)
478

    
479
        _flg.group = 0  # FIXME
480

    
481
        _mem.freq = mem.freq
482
        _nam.name = mem.name.ljust(16)
483
        _mem.offset = int(mem.offset)
484
        if mem.duplex == 'split':
485
            _mem.split = True
486
            _mem.duplex = 0
487
            _mem.split_tuning_step = TUNE_STEPS.index(
488
                chirp_common.required_step(mem.offset))
489
        else:
490
            _mem.split = False
491
            _mem.duplex = DUPLEX.index(mem.duplex)
492
        _mem.tuning_step = TUNE_STEPS.index(mem.tuning_step)
493
        _mem.mode = MODES.index(mem.mode)
494
        _mem.narrow = mem.mode == 'NFM'
495
        _mem.rtone = chirp_common.TONES.index(mem.rtone)
496
        _mem.ctone = chirp_common.TONES.index(mem.ctone)
497
        _mem.dtcs_code = chirp_common.DTCS_CODES.index(mem.dtcs)
498

    
499
        _mem.tone_mode = mem.tmode == 'Tone'
500
        _mem.ctcss_mode = mem.tmode == 'TSQL'
501
        _mem.dtcs_mode = mem.tmode == 'DTCS'
502
        _mem.cross_mode = mem.tmode == 'Cross'
503

    
504
        if mem.tmode == 'Cross':
505
            _mem.cross_mode_mode = CROSS_MODES.index(mem.cross_mode)
506

    
507
        _flg.lockout = mem.skip == 'S'
508
        if isinstance(mem, chirp_common.DVMemory):
509
            _mem.dv_urcall = encode_call(mem.dv_urcall)
510
            _mem.dv_rpt1call = encode_call(mem.dv_rpt1call)
511
            _mem.dv_rpt2call = encode_call(mem.dv_rpt2call)
512
            _mem.dv_code = mem.dv_code
513

    
514
    def get_raw_memory(self, number):
515
        return (repr(self._get_raw_memory(number)) +
516
                repr(self._memobj.flags[number]))
517

    
518
    def get_bank_model(self):
519
        return KenwoodTHD74Bankmodel(self)
520

    
521
    @classmethod
522
    def match_model(cls, filedata, filename):
523
        if filename.endswith('.d74'):
524
            return True
525
        else:
526
            return chirp_common.CloneModeRadio.match_model(filedata, filename)
    (1-1/1)