# Copyright 2017 Pavel Milanes, CO7WT, # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import time import struct import logging LOG = logging.getLogger(__name__) from time import sleep from chirp import chirp_common, directory, memmap from chirp import bitwise, errors, util from chirp.settings import RadioSetting, RadioSettingGroup, \ RadioSettingValueBoolean, RadioSettingValueList, \ RadioSettingValueInteger, RadioSettingValueString, \ RadioSettingValueFloat, RadioSettings from textwrap import dedent # A note about the memmory in these radios # mainly speculation until proven otherwise: # # The '9100' OEM software only manipulates the lower 0x180 bytes on read/write # operations as we know, the file generated by the OEM software IN NOT an exact # eeprom image, it's a crude text file with a pseudo csv format MEM_SIZE = 0x180 # 384 bytes BLOCK_SIZE = 0x10 ACK_CMD = "\x06" MODES = ["NFM", "FM"] SKIP_VALUES = ["S", ""] TONES = chirp_common.TONES DTCS = sorted(chirp_common.DTCS_CODES + [645]) # This is a general serial timeout for all serial read functions. # Practice has show that about 0.07 sec will be enough to cover all radios. STIMEOUT = 0.07 # this var controls the verbosity in the debug and by default it's low (False) # make it True and you will to get a very verbose debug.log debug = True ##### ID strings ##################################################### # BF-T1 handheld BFT1_magic = "\x05PROGRAM" BFT1_ident = " BF9100S" def _clean_buffer(radio): """Cleaning the read serial buffer, hard timeout to survive an infinite data stream""" dump = "1" datacount = 0 try: while len(dump) > 0: dump = radio.pipe.read(100) datacount += len(dump) # hard limit to survive a infinite serial data stream # 5 times bigger than a normal rx block (20 bytes) if datacount > 101: seriale = "Please check your serial port selection." raise errors.RadioError(seriale) except Exception: raise errors.RadioError("Unknown error cleaning the serial buffer") def _rawrecv(radio, amount = 0): """Raw read from the radio device""" # var to hold the data to return data = "" try: if amount == 0: data = radio.pipe.read() else: data = radio.pipe.read(amount) # DEBUG if debug is True: LOG.debug("<== (%d) bytes:\n\n%s" % (len(data), util.hexprint(data))) # fail if no data is received if len(data) == 0: raise errors.RadioError("No data received from radio") except: raise errors.RadioError("Error reading data from radio") return data def _send(radio, data): """Send data to the radio device""" try: radio.pipe.write(data) # DEBUG if debug is True: LOG.debug("==> (%d) bytes:\n\n%s" % (len(data), util.hexprint(data))) except: raise errors.RadioError("Error sending data to radio") def _make_frame(cmd, addr, data=""): """Pack the info in the header format""" frame = struct.pack(">BHB", ord(cmd), addr, BLOCK_SIZE) # add the data if set if len(data) != 0: frame += data return frame def _recv(radio, addr): """Get data from the radio""" # Get the full 20 bytes at a time # 4 bytes header + 16 bytes of data (BLOCK_SIZE) # get the whole block block = _rawrecv(radio, BLOCK_SIZE + 4) # short answer if len(block) < (BLOCK_SIZE + 4): raise errors.RadioError("Wrong block length (short) at 0x%04x" % addr) # long answer if len(block) > (BLOCK_SIZE + 4): raise errors.RadioError("Wrong block length (long) at 0x%04x" % addr) # header validation c, a, l = struct.unpack(">cHB", block[0:4]) if c != "W" or a != addr or l != BLOCK_SIZE: LOG.debug("Invalid header for block 0x%04x:" % addr) LOG.debug("CMD: %s ADDR: %04x SIZE: %02x" % (c, a, l)) raise errors.RadioError("Invalid header for block 0x%04x:" % addr) # return the data, 16 bytes of payload return block[4:] def _start_clone_mode(radio, status): """Put the radio in clone mode, 3 tries""" # cleaning the serial buffer _clean_buffer(radio) # prep the data to show in the UI status.cur = 0 status.msg = "Identifying the radio..." status.max = 3 radio.status_fn(status) try: for a in range(0, status.max): # Update the UI status.cur = a + 1 radio.status_fn(status) # send the magic word _send(radio, radio._magic) # Now you get a x06 of ACK if all goes well ack = _rawrecv(radio, 1) if ack == ACK_CMD: # DEBUG LOG.info("Magic ACK received") status.cur = status.max radio.status_fn(status) return True return False except errors.RadioError: raise except Exception, e: raise errors.RadioError("Error sending Magic to radio:\n%s" % e) def _do_ident(radio, status): """Put the radio in PROGRAM mode & identify it""" # set the serial discipline (default) radio.pipe.baudrate = 9600 radio.pipe.parity = "N" radio.pipe.bytesize = 8 radio.pipe.stopbits = 1 radio.pipe.timeout = STIMEOUT # open the radio into program mode if _start_clone_mode(radio, status) is False: raise errors.RadioError("Radio did not enter clone mode, wrong model?") # Ok, poke it to get the ident string _send(radio, "\x02") ident = _rawrecv(radio, len(radio._id)) # basic check for the ident if len(ident) != len(radio._id): raise errors.RadioError("Radio send a odd identification block.") # check if ident is OK if ident != radio._id: LOG.debug("Incorrect model ID, got this:\n\n" + util.hexprint(ident)) raise errors.RadioError("Radio identification failed.") # handshake _send(radio, ACK_CMD) ack = _rawrecv(radio, 1) #checking handshake if len(ack) == 1 and ack == ACK_CMD: # DEBUG LOG.info("ID ACK received") else: LOG.debug("Radio handshake failed.") raise errors.RadioError("Radio handshake failed.") # DEBUG LOG.info("Positive ident, this is a %s %s" % (radio.VENDOR, radio.MODEL)) return True def _download(radio): """Get the memory map""" # UI progress status = chirp_common.Status() # put radio in program mode and identify it _do_ident(radio, status) # reset the progress bar in the UI status.max = MEM_SIZE / BLOCK_SIZE status.msg = "Cloning from radio..." status.cur = 0 radio.status_fn(status) # cleaning the serial buffer _clean_buffer(radio) data = "" for addr in range(0, MEM_SIZE, BLOCK_SIZE): # sending the read request _send(radio, _make_frame("R", addr)) # read d = _recv(radio, addr) # aggregate the data data += d # UI Update status.cur = addr / BLOCK_SIZE status.msg = "Cloning from radio..." radio.status_fn(status) # close comms with the radio _send(radio, "\x62") # DEBUG LOG.info("Close comms cmd sent, radio must reboot now.") return data def _upload(radio): """Upload procedure""" # UI progress status = chirp_common.Status() # put radio in program mode and identify it _do_ident(radio, status) # get the data to upload to radio data = radio.get_mmap() # Reset the UI progress status.max = MEM_SIZE / BLOCK_SIZE status.cur = 0 status.msg = "Cloning to radio..." radio.status_fn(status) # cleaning the serial buffer _clean_buffer(radio) # the fun start here for addr in range(0, MEM_SIZE, BLOCK_SIZE): # getting the block of data to send d = data[addr:addr + BLOCK_SIZE] # build the frame to send frame = _make_frame("W", addr, d) # send the frame _send(radio, frame) # receiving the response ack = _rawrecv(radio, 1) # basic check if len(ack) != 1: raise errors.RadioError("No ACK when writing block 0x%04x" % addr) if ack != ACK_CMD: raise errors.RadioError("Bad ACK writing block 0x%04x:" % addr) # UI Update status.cur = addr / BLOCK_SIZE status.msg = "Cloning to radio..." radio.status_fn(status) # close comms with the radio _send(radio, "\x62") # DEBUG LOG.info("Close comms cmd sent, radio must reboot now.") def _model_match(cls, data): """Match the opened/downloaded image to the correct version""" # we don't have a reliable fingerprint in the mem space by now. # then we just aim for a specific zone filled with \xff rid = data[0x0158:0x0160] if rid == ("\xff" * 8): return True return False def _decode_ranges(low, high): """Unpack the data in the ranges zones in the memmap and return a tuple with the integer corresponding to the Mhz it means""" return (int(low) * 100000, int(high) * 100000) MEM_FORMAT = """ #seekto 0x0000; // normal 1-20 mem channels // channel 0 is Emergent CH struct { lbcd rxfreq[4]; // rx freq. u8 rxtone; // x00 = none // x01 - x32 = index of the analog tones // x33 - x9b = index of Digital tones // Digital tone polarity is handled below lbcd txoffset[4]; // the difference against RX // pending to find the offset polarity in settings u8 txtone; // Idem to rxtone u8 noskip:1, // if true is included in the scan wide:1, // 1 = Wide, 0 = Narrow ttondinv:1, // if true TX tone is Digital & Inverted unA:1, // rtondinv:1, // if true RX tone is Digital & Inverted unB:1, // offplus:1, // TX = RX + offset offminus:1; // TX = RX - offset u8 empty[5]; } memory[21]; #seekto 0x0150; // Unknown data... settings? struct { lbcd vhfl[2]; // VHF low limit lbcd vhfh[2]; // VHF high limit lbcd uhfl[2]; // UHF low limit lbcd uhfh[2]; // UHF high limit u8 finger[8]; // can we use this as fingerprint "\xFF" * 16 u8 unk0; u8 unk1; u8 squelch; u8 vox; u8 timeout; u8 batsave:1, fm_funct:1, ste:1, blo:1, beep:1, lock:1, backlight:2; u8 scantype; u8 channel; u8 fmrange; u8 alarm; u8 voice; u8 volume; u16 fmvfo; u8 relaym; u8 tx_pwr; } settings; #seekto 0x0170; // Relay CH: same structure of memory ? struct { lbcd rxfreq[4]; // rx freq. u8 rxtone; // x00 = none // x01 - x32 = index of the analog tones // x33 - x9b = index of Digital tones // Digital tone polarity is handled below lbcd txoffset[4]; // the difference against RX // pending to find the offset polarity in settings u8 txtone; // Idem to rxtone u8 noskip:1, // if true is included in the scan wide:1, // 1 = Wide, 0 = Narrow ttondinv:1, // if true TX tone is Digital & Inverted unC:1, // rtondinv:1, // if true RX tone is Digital & Inverted unD:1, // offplus:1, // TX = RX + offset offminus:1; // TX = RX - offset u8 empty[5]; } relaych; """ @directory.register class BFT1(chirp_common.CloneModeRadio, chirp_common.ExperimentalRadio): """Baofeng BT-F1 radio & possibly alike radios""" VENDOR = "Baofeng" MODEL = "BF-T1" _vhf_range = (136000000, 174000000) _uhf_range = (400000000, 470000000) _upper = 20 _magic = BFT1_magic _id = BFT1_ident @classmethod def get_prompts(cls): rp = chirp_common.RadioPrompts() rp.experimental = \ ('This driver is experimental.\n' '\n' 'Please keep a copy of your memories with the original software ' 'if you treasure them, this driver is new and may contain' ' bugs.\n' '\n' 'Channel Zero is "Emergent CH", "Relay CH" is not implemented yet,' 'FM VFO and Limits are still missing in settings tab.' ) rp.pre_download = _(dedent("""\ Follow these instructions to download your info: 1 - Turn off your radio 2 - Connect your interface cable 3 - Turn on your radio 4 - Do the download of your radio data """)) rp.pre_upload = _(dedent("""\ Follow these instructions to upload your info: 1 - Turn off your radio 2 - Connect your interface cable 3 - Turn on your radio 4 - Do the upload of your radio data """)) return rp def get_features(self): """Get the radio's features""" rf = chirp_common.RadioFeatures() rf.has_settings = True rf.has_bank = False rf.has_tuning_step = False rf.can_odd_split = True rf.has_name = False rf.has_offset = True rf.has_mode = True rf.valid_modes = MODES rf.has_dtcs = True rf.has_rx_dtcs = True rf.has_dtcs_polarity = True rf.has_ctone = True rf.has_cross = True rf.valid_duplexes = ["", "-", "+", "split"] rf.valid_tmodes = ['', 'Tone', 'TSQL', 'DTCS', 'Cross'] rf.valid_cross_modes = [ "Tone->Tone", "DTCS->", "->DTCS", "Tone->DTCS", "DTCS->Tone", "->Tone", "DTCS->DTCS"] rf.valid_skips = SKIP_VALUES rf.valid_dtcs_codes = DTCS rf.memory_bounds = (0, self._upper) # normal dual bands rf.valid_bands = [self._vhf_range, self._uhf_range] return rf def process_mmap(self): """Process the mem map into the mem object""" # Get it self._memobj = bitwise.parse(MEM_FORMAT, self._mmap) # set the band limits as the memmap settings = self._memobj.settings self._vhf_range = _decode_ranges(settings.vhfl, settings.vhfh) self._uhf_range = _decode_ranges(settings.uhfl, settings.uhfh) def sync_in(self): """Download from radio""" data = _download(self) self._mmap = memmap.MemoryMap(data) self.process_mmap() def sync_out(self): """Upload to radio""" try: _upload(self) except errors.RadioError: raise except Exception, e: raise errors.RadioError("Error: %s" % e) def _decode_tone(self, val, inv): """Parse the tone data to decode from mem, it returns: Mode (''|DTCS|Tone), Value (None|###), Polarity (None,N,R)""" if val == 0: return '', None, None elif val < 51: # analog tone return 'Tone', TONES[val - 1], None elif val > 50: # digital tone pol = "N" # polarity? if inv == 1: pol = "R" return 'DTCS', DTCS[val - 51], pol def _encode_tone(self, memtone, meminv, mode, tone, pol): """Parse the tone data to encode from UI to mem""" if mode == '' or mode is None: memtone.set_value(0) meminv.set_value(0) elif mode == 'Tone': # caching errors for analog tones. try: memtone.set_value(TONES.index(tone) + 1) meminv.set_value(0) except: msg = "TCSS Tone '%d' is not supported" % tone LOG.error(msg) raise errors.RadioError(msg) elif mode == 'DTCS': # caching errors for digital tones. try: memtone.set_value(DTCS.index(tone) + 51) if pol == "R": meminv.set_value(True) else: meminv.set_value(False) except: msg = "Digital Tone '%d' is not supported" % tone LOG.error(msg) raise errors.RadioError(msg) else: msg = "Internal error: invalid mode '%s'" % mode LOG.error(msg) raise errors.InvalidDataError(msg) def get_raw_memory(self, number): return repr(self._memobj.memory[number]) def get_memory(self, number): """Get the mem representation from the radio image""" _mem = self._memobj.memory[number] # Create a high-level memory object to return to the UI mem = chirp_common.Memory() # Memory number mem.number = number if _mem.get_raw()[0] == "\xFF": mem.empty = True return mem # Freq and offset mem.freq = int(_mem.rxfreq) * 10 # TX freq (Stored as a difference) mem.offset = int(_mem.txoffset) * 10 mem.duplex = "" # must work out the polarity if mem.offset != 0: if _mem.offminus == 1: mem.duplex = "-" # tx below RX if _mem.offplus == 1: # tx above RX mem.duplex = "+" # split RX/TX in different bands if mem.offset > 71000000: mem.duplex = "split" # show the actual value in the offset, depending on the shift if _mem.offminus == 1: mem.offset = mem.freq - mem.offset if _mem.offplus == 1: mem.offset = mem.freq + mem.offset # wide/narrow mem.mode = MODES[int(_mem.wide)] # skip mem.skip = SKIP_VALUES[_mem.noskip] # tone data rxtone = txtone = None txtone = self._decode_tone(_mem.txtone, _mem.ttondinv) rxtone = self._decode_tone(_mem.rxtone, _mem.rtondinv) chirp_common.split_tone_decode(mem, txtone, rxtone) return mem def set_memory(self, mem): """Set the memory data in the eeprom img from the UI""" # get the eprom representation of this channel _mem = self._memobj.memory[mem.number] # if empty memmory if mem.empty: # the channel itself _mem.set_raw("\xFF" * 16) # return it return mem # frequency _mem.rxfreq = mem.freq / 10 # duplex/ offset Offset is an absolute value _mem.txoffset = mem.offset / 10 # must work out the polarity if mem.duplex == "": _mem.offplus = 0 _mem.offminus = 0 elif mem.duplex == "+": _mem.offplus = 1 _mem.offminus = 0 elif mem.duplex == "-": _mem.offplus = 0 _mem.offminus = 1 elif mem.duplex == "split": if mem.freq > mem.offset: _mem.offplus = 0 _mem.offminus = 1 _mem.txoffset = (mem.freq - mem.offset) / 10 else: _mem.offplus = 1 _mem.offminus = 0 _mem.txoffset = (mem.offset - mem.freq) / 10 # wide/narrow _mem.wide = MODES.index(mem.mode) # skip _mem.noskip = SKIP_VALUES.index(mem.skip) # tone data ((txmode, txtone, txpol), (rxmode, rxtone, rxpol)) = \ chirp_common.split_tone_encode(mem) self._encode_tone(_mem.txtone, _mem.ttondinv, txmode, txtone, txpol) self._encode_tone(_mem.rxtone, _mem.rtondinv, rxmode, rxtone, rxpol) return mem # added this def get_settings(self): _settings = self._memobj.settings basic = RadioSettingGroup("basic", "Basic Settings") fm = RadioSettingGroup("fm", "FM Radio") group = RadioSettings(basic, fm) rs = RadioSetting("channel", "Current Channel", RadioSettingValueInteger(1, 20, _settings.channel)) basic.append(rs) options = ["0.5 Watt", "1.0 Watt"] rs = RadioSetting("tx_pwr", "TX Power", RadioSettingValueList(options, options[_settings.tx_pwr])) basic.append(rs) if _settings.volume > 8: _settings.volume = 8 rs = RadioSetting("volume", "Volume Level", RadioSettingValueInteger(0, 9, _settings.volume)) basic.append(rs) rs = RadioSetting("vox", "VOX Level", RadioSettingValueInteger(0, 9, _settings.vox)) basic.append(rs) options = ["Off"] + ["%s sec" % (x*30) for x in range(1, 7)] rs = RadioSetting("timeout", "Time Out Timer", RadioSettingValueList( options, options[_settings.timeout])) basic.append(rs) options = ["Time", "Carrier", "Search"] rs = RadioSetting("scantype", "Scan Type", RadioSettingValueList(options, options[_settings.scantype])) basic.append(rs) options = ["Off"] + ["%s h" % (x*0.5) for x in range(1, 17)] rs = RadioSetting("alarm", "Alarm Time", RadioSettingValueList( options, options[_settings.alarm])) basic.append(rs) options = ["Off","English", "Chinese"] rs = RadioSetting("voice", "Voice Prompt", RadioSettingValueList(options, options[_settings.voice])) basic.append(rs) options = ["Off", "RX sync", "TX sync"] rs = RadioSetting("relaym", "Relay Mode", RadioSettingValueList(options, options[_settings.relaym])) basic.append(rs) options = ["Off", "Key", "On"] rs = RadioSetting("backlight", "Backlight", RadioSettingValueList(options, options[_settings.backlight])) basic.append(rs) rs = RadioSetting("lock", "Key Lock", RadioSettingValueBoolean(_settings.lock)) basic.append(rs) rs = RadioSetting("beep", "Key Beep", RadioSettingValueBoolean(_settings.beep)) basic.append(rs) rs = RadioSetting("blo", "Busy Lockout", RadioSettingValueBoolean(_settings.blo)) basic.append(rs) rs = RadioSetting("ste", "Squelch Tail Eliminate", RadioSettingValueBoolean(_settings.ste)) basic.append(rs) rs = RadioSetting("batsave", "Battery Save", RadioSettingValueBoolean(_settings.batsave)) basic.append(rs) rs = RadioSetting("fm_funct", "FM Function", RadioSettingValueBoolean(_settings.fm_funct)) fm.append(rs) options = ["76-108", "65-76"] rs = RadioSetting("fmrange", "FM Range", RadioSettingValueList(options, options[_settings.fmrange])) fm.append(rs) fmset = _settings.fmvfo / 10.0 + 65.0 rs = RadioSetting("fmvfo", "FM VFO", RadioSettingValueFloat(65, 108, fmset, 0.1, 1)) def apply_fmset(setting,obj): val = setting.value.get_value() value = int(val * 10 - 650) obj.fmvfo = value rs.set_apply_callback(apply_fmset,self._memobj.settings) fm.append(rs) return group def set_settings(self, settings): _settings = self._memobj.settings for element in settings: if not isinstance(element, RadioSetting): self.set_settings(element) continue else: try: name = element.get_name() if "." in name: bits = name.split(".") obj = self._memobj for bit in bits[:-1]: if "/" in bit: bit, index = bit.split("/", 1) index = int(index) obj = getattr(obj, bit)[index] else: obj = getattr(obj, bit) setting = bits[-1] else: obj = _settings setting = element.get_name() if element.has_apply_callback(): LOG.debug("Using apply callback") element.run_apply_callback() elif setting == "beep_tone_disabled": setattr(obj, setting, not int(element.value)) elif setting == "ste_disabled": setattr(obj, setting, not int(element.value)) else: LOG.debug("Setting %s = %s" % (setting, element.value)) setattr(obj, setting, element.value) except Exception, e: LOG.debug(element.get_name()) raise # experimental end @classmethod def match_model(cls, filedata, filename): match_size = False match_model = False # testing the file data size if len(filedata) == MEM_SIZE: match_size = True # DEBUG if debug is True: LOG.debug("BF-T1 matched!") # testing the firmware model fingerprint match_model = _model_match(cls, filedata) if match_size and match_model: return True else: return False