Project

General

Profile

Bug #8745 ยป ic9x.py

patched ic9x.py - Jonathan Woytek, 01/27/2021 05:33 PM

 
1
# Copyright 2008 Dan Smith <dsmith@danplanet.com>
2
#
3
# This program is free software: you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation, either version 3 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
15

    
16
import time
17
import threading
18
import logging
19

    
20
from chirp.drivers import ic9x_ll, icf
21
from chirp import chirp_common, errors, util, directory
22
from chirp import bitwise
23

    
24
LOG = logging.getLogger(__name__)
25

    
26
IC9XA_SPECIAL = {}
27
IC9XB_SPECIAL = {}
28

    
29
for i in range(0, 25):
30
    idA = "%iA" % i
31
    idB = "%iB" % i
32
    Anum = 800 + i * 2
33
    Bnum = 400 + i * 2
34

    
35
    IC9XA_SPECIAL[idA] = Anum
36
    IC9XA_SPECIAL[idB] = Bnum
37

    
38
    IC9XB_SPECIAL[idA] = Bnum
39
    IC9XB_SPECIAL[idB] = Bnum + 1
40

    
41
IC9XA_SPECIAL["C0"] = IC9XB_SPECIAL["C0"] = -1
42
IC9XA_SPECIAL["C1"] = IC9XB_SPECIAL["C1"] = -2
43

    
44
IC9X_SPECIAL = {
45
    0: {},
46
    1: IC9XA_SPECIAL,
47
    2: IC9XB_SPECIAL,
48
}
49

    
50
CHARSET = chirp_common.CHARSET_ALPHANUMERIC + \
51
    "!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"
52

    
53
LOCK = threading.Lock()
54

    
55

    
56
class IC9xBank(icf.IcomNamedBank):
57
    """Icom 9x Bank"""
58
    def get_name(self):
59
        banks = self._model._radio._ic9x_get_banks()
60
        return banks[self.index]
61

    
62
    def set_name(self, name):
63
        banks = self._model._radio._ic9x_get_banks()
64
        banks[self.index] = name
65
        self._model._radio._ic9x_set_banks(banks)
66

    
67

    
68
@directory.register
69
class IC9xRadio(icf.IcomLiveRadio):
70
    """Base class for Icom IC-9x radios"""
71
    MODEL = "IC-91/92AD"
72

    
73
    _model = "ic9x"    # Fake model info for detect.py
74
    vfo = 0
75
    __last = 0
76
    _upper = 300
77

    
78
    _num_banks = 26
79
    _bank_class = IC9xBank
80

    
81
    def _get_bank(self, loc):
82
        mem = self.get_memory(loc)
83
        try:
84
            bank = mem._bank
85
        except AttributeError:
86
            # no bank attribute?
87
            bank = None
88
            print("no bank attribute for memory {}".format(loc))
89
        return bank
90

    
91
    def _set_bank(self, loc, bank):
92
        mem = self.get_memory(loc)
93
        mem._bank = bank
94
        self.set_memory(mem)
95

    
96
    def _get_bank_index(self, loc):
97
        mem = self.get_memory(loc)
98
        return mem._bank_index
99

    
100
    def _set_bank_index(self, loc, index):
101
        mem = self.get_memory(loc)
102
        mem._bank_index = index
103
        self.set_memory(mem)
104

    
105
    def __init__(self, *args, **kwargs):
106
        icf.IcomLiveRadio.__init__(self, *args, **kwargs)
107

    
108
        if self.pipe:
109
            self.pipe.timeout = 0.1
110

    
111
        self.__memcache = {}
112
        self.__bankcache = {}
113

    
114
        global LOCK
115
        self._lock = LOCK
116

    
117
    def _maybe_send_magic(self):
118
        if (time.time() - self.__last) > 1:
119
            LOG.debug("Sending magic")
120
            ic9x_ll.send_magic(self.pipe)
121
        self.__last = time.time()
122

    
123
    def get_memory(self, number):
124
        if isinstance(number, str):
125
            try:
126
                number = IC9X_SPECIAL[self.vfo][number]
127
            except KeyError:
128
                raise errors.InvalidMemoryLocation(
129
                        "Unknown channel %s" % number)
130

    
131
        if number < -2 or number > 999:
132
            raise errors.InvalidValueError("Number must be between 0 and 999")
133

    
134
        if number in self.__memcache:
135
            return self.__memcache[number]
136

    
137
        self._lock.acquire()
138
        try:
139
            self._maybe_send_magic()
140
            mem = ic9x_ll.get_memory(self.pipe, self.vfo, number)
141
        except errors.InvalidMemoryLocation:
142
            mem = ic9x_ll.IC9xMemory()
143
            mem.number = number
144
            if number < self._upper:
145
                mem.empty = True
146
        except:
147
            self._lock.release()
148
            raise
149

    
150
        self._lock.release()
151

    
152
        if number > self._upper or number < 0:
153
            mem.extd_number = util.get_dict_rev(IC9X_SPECIAL,
154
                                                [self.vfo][number])
155
            mem.immutable = ["number", "skip", "bank", "bank_index",
156
                             "extd_number"]
157

    
158
        self.__memcache[mem.number] = mem
159

    
160
        return mem
161

    
162
    def get_raw_memory(self, number):
163
        self._lock.acquire()
164
        try:
165
            ic9x_ll.send_magic(self.pipe)
166
            mframe = ic9x_ll.get_memory_frame(self.pipe, self.vfo, number)
167
        except:
168
            self._lock.release()
169
            raise
170

    
171
        self._lock.release()
172

    
173
        return repr(bitwise.parse(ic9x_ll.MEMORY_FRAME_FORMAT, mframe))
174

    
175
    def get_memories(self, lo=0, hi=None):
176
        if hi is None:
177
            hi = self._upper
178

    
179
        memories = []
180

    
181
        for i in range(lo, hi + 1):
182
            try:
183
                LOG.debug("Getting %i" % i)
184
                mem = self.get_memory(i)
185
                if mem:
186
                    memories.append(mem)
187
                LOG.debug("Done: %s" % mem)
188
            except errors.InvalidMemoryLocation:
189
                LOG.error("Invalid memory location: {}".format(i))
190
                pass
191
            except errors.InvalidDataError, e:
192
                LOG.error("Error talking to radio: %s" % e)
193
                break
194

    
195
        return memories
196

    
197
    def set_memory(self, _memory):
198
        # Make sure we mirror the DV-ness of the new memory we're
199
        # setting, and that we capture the Bank value of any currently
200
        # stored memory (unless the special type is provided) and
201
        # communicate that to the low-level routines with the special
202
        # subclass
203
        if isinstance(_memory, ic9x_ll.IC9xMemory) or \
204
                 isinstance(_memory, ic9x_ll.IC9xDVMemory):
205
            memory = _memory
206
        else:
207
            if isinstance(_memory, chirp_common.DVMemory):
208
                memory = ic9x_ll.IC9xDVMemory()
209
                memory.clone(self.get_memory(_memory.number))
210
            else:
211
                memory = ic9x_ll.IC9xMemory()
212
                memory.clone(self.get_memory(_memory.number))
213

    
214
            memory.clone(_memory)
215

    
216
        self._lock.acquire()
217
        self._maybe_send_magic()
218
        try:
219
            if memory.empty:
220
                ic9x_ll.erase_memory(self.pipe, self.vfo, memory.number)
221
            else:
222
                ic9x_ll.set_memory(self.pipe, self.vfo, memory)
223
            memory = ic9x_ll.get_memory(self.pipe, self.vfo, memory.number)
224
        except:
225
            self._lock.release()
226
            raise
227

    
228
        self._lock.release()
229

    
230
        self.__memcache[memory.number] = memory
231

    
232
    def _ic9x_get_banks(self):
233
        if len(self.__bankcache.keys()) == 26:
234
            return [self.__bankcache[k] for k in
235
                    sorted(self.__bankcache.keys())]
236

    
237
        self._lock.acquire()
238
        try:
239
            self._maybe_send_magic()
240
            banks = ic9x_ll.get_banks(self.pipe, self.vfo)
241
        except:
242
            self._lock.release()
243
            raise
244

    
245
        self._lock.release()
246

    
247
        i = 0
248
        for bank in banks:
249
            self.__bankcache[i] = bank
250
            i += 1
251

    
252
        return banks
253

    
254
    def _ic9x_set_banks(self, banks):
255

    
256
        if len(banks) != len(self.__bankcache.keys()):
257
            raise errors.InvalidDataError("Invalid bank list length (%i:%i)" %
258
                                          (len(banks),
259
                                           len(self.__bankcache.keys())))
260

    
261
        cached_names = [str(self.__bankcache[x])
262
                        for x in sorted(self.__bankcache.keys())]
263

    
264
        need_update = False
265
        for i in range(0, 26):
266
            if banks[i] != cached_names[i]:
267
                need_update = True
268
                self.__bankcache[i] = banks[i]
269
                LOG.dbeug("Updating %s: %s -> %s" %
270
                          (chr(i + ord("A")), cached_names[i], banks[i]))
271

    
272
        if need_update:
273
            self._lock.acquire()
274
            try:
275
                self._maybe_send_magic()
276
                ic9x_ll.set_banks(self.pipe, self.vfo, banks)
277
            except:
278
                self._lock.release()
279
                raise
280

    
281
            self._lock.release()
282

    
283
    def get_sub_devices(self):
284
        return [IC9xRadioA(self.pipe), IC9xRadioB(self.pipe)]
285

    
286
    def get_features(self):
287
        rf = chirp_common.RadioFeatures()
288
        rf.has_sub_devices = True
289
        rf.valid_special_chans = IC9X_SPECIAL[self.vfo].keys()
290

    
291
        return rf
292

    
293

    
294
class IC9xRadioA(IC9xRadio):
295
    """IC9x Band A subdevice"""
296
    VARIANT = "Band A"
297
    vfo = 1
298
    _upper = 849
299

    
300
    def get_features(self):
301
        rf = chirp_common.RadioFeatures()
302
        rf.has_bank = True
303
        rf.has_bank_index = True
304
        rf.has_bank_names = True
305
        rf.memory_bounds = (0, self._upper)
306
        rf.valid_modes = ["FM", "WFM", "AM"]
307
        rf.valid_tmodes = ["", "Tone", "TSQL", "DTCS"]
308
        rf.valid_duplexes = ["", "-", "+"]
309
        rf.valid_tuning_steps = list(chirp_common.TUNING_STEPS)
310
        rf.valid_bands = [(500000, 9990000000)]
311
        rf.valid_skips = ["", "S", "P"]
312
        rf.valid_characters = CHARSET
313
        rf.valid_name_length = 8
314
        return rf
315

    
316

    
317
class IC9xRadioB(IC9xRadio, chirp_common.IcomDstarSupport):
318
    """IC9x Band B subdevice"""
319
    VARIANT = "Band B"
320
    vfo = 2
321
    _upper = 449
322

    
323
    MYCALL_LIMIT = (1, 7)
324
    URCALL_LIMIT = (1, 61)
325
    RPTCALL_LIMIT = (1, 61)
326

    
327
    def get_features(self):
328
        rf = chirp_common.RadioFeatures()
329
        rf.has_bank = True
330
        rf.has_bank_index = True
331
        rf.has_bank_names = True
332
        rf.requires_call_lists = False
333
        rf.memory_bounds = (0, self._upper)
334
        rf.valid_modes = ["FM", "NFM", "AM", "DV"]
335
        rf.valid_tmodes = ["", "Tone", "TSQL", "DTCS"]
336
        rf.valid_duplexes = ["", "-", "+"]
337
        rf.valid_tuning_steps = list(chirp_common.TUNING_STEPS)
338
        rf.valid_bands = [(118000000, 174000000), (350000000, 470000000)]
339
        rf.valid_skips = ["", "S", "P"]
340
        rf.valid_characters = CHARSET
341
        rf.valid_name_length = 8
342
        return rf
343

    
344
    def __init__(self, *args, **kwargs):
345
        IC9xRadio.__init__(self, *args, **kwargs)
346

    
347
        self.__rcalls = []
348
        self.__mcalls = []
349
        self.__ucalls = []
350

    
351
    def __get_call_list(self, cache, cstype, ulimit):
352
        if cache:
353
            return cache
354

    
355
        calls = []
356

    
357
        self._maybe_send_magic()
358
        for i in range(ulimit - 1):
359
            call = ic9x_ll.get_call(self.pipe, cstype, i+1)
360
            calls.append(call)
361

    
362
        return calls
363

    
364
    def __set_call_list(self, cache, cstype, ulimit, calls):
365
        for i in range(ulimit - 1):
366
            blank = " " * 8
367

    
368
            try:
369
                acall = cache[i]
370
            except IndexError:
371
                acall = blank
372

    
373
            try:
374
                bcall = calls[i]
375
            except IndexError:
376
                bcall = blank
377

    
378
            if acall == bcall:
379
                continue    # No change to this one
380

    
381
            self._maybe_send_magic()
382
            ic9x_ll.set_call(self.pipe, cstype, i + 1, calls[i])
383

    
384
        return calls
385

    
386
    def get_mycall_list(self):
387
        self.__mcalls = self.__get_call_list(self.__mcalls,
388
                                             ic9x_ll.IC92MyCallsignFrame,
389
                                             self.MYCALL_LIMIT[1])
390
        return self.__mcalls
391

    
392
    def get_urcall_list(self):
393
        self.__ucalls = self.__get_call_list(self.__ucalls,
394
                                             ic9x_ll.IC92YourCallsignFrame,
395
                                             self.URCALL_LIMIT[1])
396
        return self.__ucalls
397

    
398
    def get_repeater_call_list(self):
399
        self.__rcalls = self.__get_call_list(self.__rcalls,
400
                                             ic9x_ll.IC92RepeaterCallsignFrame,
401
                                             self.RPTCALL_LIMIT[1])
402
        return self.__rcalls
403

    
404
    def set_mycall_list(self, calls):
405
        self.__mcalls = self.__set_call_list(self.__mcalls,
406
                                             ic9x_ll.IC92MyCallsignFrame,
407
                                             self.MYCALL_LIMIT[1],
408
                                             calls)
409

    
410
    def set_urcall_list(self, calls):
411
        self.__ucalls = self.__set_call_list(self.__ucalls,
412
                                             ic9x_ll.IC92YourCallsignFrame,
413
                                             self.URCALL_LIMIT[1],
414
                                             calls)
415

    
416
    def set_repeater_call_list(self, calls):
417
        self.__rcalls = self.__set_call_list(self.__rcalls,
418
                                             ic9x_ll.IC92RepeaterCallsignFrame,
419
                                             self.RPTCALL_LIMIT[1],
420
                                             calls)
421

    
422

    
423
def _test():
424
    import serial
425
    ser = IC9xRadioB(serial.Serial(port="/dev/ttyUSB1",
426
                                   baudrate=38400, timeout=0.1))
427
    print ser.get_urcall_list()
428
    print "-- FOO --"
429
    ser.set_urcall_list(["K7TAY", "FOOBAR", "BAZ"])
430

    
431

    
432
if __name__ == "__main__":
433
    _test()
    (1-1/1)