# Copyright 2017 Pavel Milanes, CO7WT, # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import time import struct import logging LOG = logging.getLogger(__name__) from time import sleep from chirp import chirp_common, directory, memmap from chirp import bitwise, errors, util from textwrap import dedent # A note about the memmory in these radios # mainly speculation until proven otherwise: # # The '9100' OEM software only manipulates the lower 0x180 bytes on read/write # operations as we know, the file generated by the OEM software IN NOT an exact # eeprom image, it's a crude text file with a pseudo csv format MEM_SIZE = 0x180 # 384 bytes BLOCK_SIZE = 0x10 ACK_CMD = "\x06" MODES = ["FM", "NFM"] SKIP_VALUES = ["S", ""] # This is a general serial timeout for all serial read functions. # Practice has show that about 0.7 sec will be enough to cover all radios. STIMEOUT = 1 # this var controls the verbosity in the debug and by default it's low (False) # make it True and you will to get a very verbose debug.log debug = True ##### ID strings ##################################################### # BF-T1 handheld BFT1_magic = "\x05PROGRAM" BFT1_ident = "\x20\x42\x46\x39\x31\x30\x30\x53" # " BF9100S" def _clean_buffer(radio): """Cleaning the read serial buffer, hard timeout to survive an infinite data stream""" # touching the serial timeout to optimize the flushing # restored at the end to the default value radio.pipe.timeout = 0.1 dump = "1" datacount = 0 try: while len(dump) > 0: dump = radio.pipe.read(100) datacount += len(dump) # hard limit to survive a infinite serial data stream # 5 times bigger than a normal rx block (20 bytes) if datacount > 101: seriale = "Please check your serial port selection." raise errors.RadioError(seriale) # restore the default serial timeout radio.pipe.timeout = STIMEOUT except Exception: raise errors.RadioError("Unknown error cleaning the serial buffer") def _rawrecv(radio, amount = 0): """Raw read from the radio device""" # var to hold the data to return data = "" try: if amount == 0: data = radio.pipe.read() else: data = radio.pipe.read(amount) # DEBUG if debug is True: LOG.debug("<== (%d) bytes:\n\n%s" % (len(data), util.hexprint(data))) # fail if no data is received if len(data) == 0: raise errors.RadioError("No data received from radio") except: raise errors.RadioError("Error reading data from radio") return data def _send(radio, data): """Send data to the radio device""" try: radio.pipe.write(data) # DEBUG if debug is True: LOG.debug("==> (%d) bytes:\n\n%s" % (len(data), util.hexprint(data))) except: raise errors.RadioError("Error sending data to radio") def _make_frame(cmd, addr, data=""): """Pack the info in the header format""" frame = struct.pack(">BHB", ord(cmd), addr, BLOCK_SIZE) # add the data if set if len(data) != 0: frame += data return frame def _recv(radio, addr): """Get data from the radio""" # Get the full 20 bytes at a time # 4 bytes header + 16 bytes of data (BLOCK_SIZE) # get the whole block block = _rawrecv(radio, BLOCK_SIZE + 4) # short answer if len(block) < (BLOCK_SIZE + 4): raise errors.RadioError("Wrong block length (short) at 0x%04x" % addr) # long answer if len(block) > (BLOCK_SIZE + 4): raise errors.RadioError("Wrong block length (long) at 0x%04x" % addr) # header validation c, a, l = struct.unpack(">cHB", block[0:4]) if c != "W" or a != addr or l != BLOCK_SIZE: LOG.debug("Invalid header for block 0x%04x:" % addr) LOG.debug("CMD: %s ADDR: %04x SIZE: %02x" % (c, a, l)) raise errors.RadioError("Invalid header for block 0x%04x:" % addr) # return the data, 16 bytes of payload return block[4:] def _start_clone_mode(radio, status): """Put the radio in clone mode, 3 tries""" # cleaning the serial buffer _clean_buffer(radio) # prep the data to show in the UI status.cur = 0 status.msg = "Identifying the radio..." status.max = 3 radio.status_fn(status) try: for a in range(0, status.max): # Update the UI status.cur = a + 1 radio.status_fn(status) # send the magic word _send(radio, radio._magic) # Now you get a x06 of ACK if all goes well ack = _rawrecv(radio, 1) if ack == ACK_CMD: # DEBUG LOG.info("Magic ACK received") status.cur = status.max radio.status_fn(status) return True return False except errors.RadioError: raise except Exception, e: raise errors.RadioError("Error sending Magic to radio:\n%s" % e) def _do_ident(radio, status): """Put the radio in PROGRAM mode & identify it""" # set the serial discipline (default) radio.pipe.baudrate = 9600 radio.pipe.parity = "N" radio.pipe.bytesize = 8 radio.pipe.stopbits = 1 radio.pipe.timeout = STIMEOUT radio.pipe.flush() # open the radio into program mode if _start_clone_mode(radio, status) is False: raise errors.RadioError("Radio did not enter clone mode, wrong model?") # Ok, poke it to get the ident string _send(radio, "\x02") ident = _rawrecv(radio, len(radio._id)) # basic check for the ident if len(ident) != len(radio._id): raise errors.RadioError("Radio send a odd identification block.") # check if ident is OK if ident != radio._id: LOG.debug("Incorrect model ID, got this:\n\n" + util.hexprint(ident)) raise errors.RadioError("Radio identification failed.") # handshake _send(radio, ACK_CMD) ack = _rawrecv(radio, 1) #checking handshake if len(ack) == 1 and ack == ACK_CMD: # DEBUG LOG.info("ID ACK received") else: LOG.debug("Radio handshake failed.") raise errors.RadioError("Radio handshake failed.") # DEBUG LOG.info("Positive ident, this is a %s %s" % (radio.VENDOR, radio.MODEL)) return True def _download(radio): """Get the memory map""" # UI progress status = chirp_common.Status() # put radio in program mode and identify it _do_ident(radio, status) # reset the progress bar in the UI status.max = MEM_SIZE / BLOCK_SIZE status.msg = "Cloning from radio..." status.cur = 0 radio.status_fn(status) # cleaning the serial buffer _clean_buffer(radio) # increasing the timeout in the "discovery" process. radio.pipe.timeout = 3 # 3 seconds. data = "" for addr in range(0, MEM_SIZE, BLOCK_SIZE): # sending the read request _send(radio, _make_frame("R", addr)) # read d = _recv(radio, addr) # aggregate the data data += d # UI Update status.cur = addr / BLOCK_SIZE status.msg = "Cloning from radio..." radio.status_fn(status) return data def _upload(radio): """Upload procedure""" # UI progress status = chirp_common.Status() # put radio in program mode and identify it _do_ident(radio, status, True) # get the data to upload to radio data = radio.get_mmap() # Reset the UI progress status.max = MEM_SIZE / BLOCK_SIZE status.cur = 0 status.msg = "Cloning to radio..." radio.status_fn(status) # cleaning the serial buffer _clean_buffer(radio) # the fun start here for addr in range(0, MEM_SIZE, BLOCK_SIZE): # getting the block of data to send d = data[addr:addr + BLOCK_SIZE] # build the frame to send frame = _make_frame("W", addr, BLOCK_SIZE, d) # send the frame _send(radio, frame) # receiving the response ack = _rawrecv(radio, 1) # basic check if len(ack) != 1: raise errors.RadioError("No ACK when writing block 0x%04x" % addr) if ack != ACK_CMD: raise errors.RadioError("Bad ACK writing block 0x%04x:" % addr) # UI Update status.cur = addr / TX_BLOCK_SIZE status.msg = "Cloning to radio..." radio.status_fn(status) def _split(rf, f1, f2): """Returns False if the two freqs are in the same band (no split) or True otherwise""" # determine if the two freqs are in the same band for low, high in rf.valid_bands: if f1 >= low and f1 <= high and f2 >= low and f2 <= high: # if the two freqs are on the same Band this is not a split return False # if you get here is because the freq pairs are split return True #~ def model_match(cls, data): #~ """Match the opened/downloaded image to the correct version""" #~ # by now just size match #~ return False MEM_FORMAT = """ #seekto 0x0000; struct { u8 unknown0; lbcd rxfreq[3]; u8 unknown1[4]; lbcd txfreq[3]; u8 unknown2[5]; } memory[20]; """ @directory.register class BFT1(chirp_common.CloneModeRadio, chirp_common.ExperimentalRadio): """Baofeng BT-F1 radio & possibly alike radios""" VENDOR = "Baofeng" MODEL = "BF-T1" _power_levels = [chirp_common.PowerLevel("High", watts=5), chirp_common.PowerLevel("Low", watts=1)] _vhf_range = (136000000, 174000000) _uhf_range = (400000000, 470000000) _upper = 19 _magic = BFT1_magic _id = BFT1_ident @classmethod def get_prompts(cls): rp = chirp_common.RadioPrompts() rp.experimental = \ ('This driver is experimental.\n' '\n' 'Please keep a copy of your memories with the original software ' 'if you treasure them, this driver is new and may contain' ' bugs.\n' '\n' ) rp.pre_download = _(dedent("""\ Follow these instructions to download your info: 1 - Turn off your radio 2 - Connect your interface cable 3 - Turn on your radio 4 - Do the download of your radio data """)) rp.pre_upload = _(dedent("""\ Follow these instructions to upload your info: 1 - Turn off your radio 2 - Connect your interface cable 3 - Turn on your radio 4 - Do the upload of your radio data """)) return rp def get_features(self): """Get the radio's features""" # we will use the following var as global global POWER_LEVELS rf = chirp_common.RadioFeatures() #~ rf.has_settings = True #~ rf.has_bank = False #~ rf.has_tuning_step = False #~ rf.can_odd_split = True #~ rf.has_name = True rf.has_offset = True rf.has_mode = True rf.valid_modes = MODES #~ rf.has_dtcs = True #~ rf.has_rx_dtcs = True #~ rf.has_dtcs_polarity = True #~ rf.has_ctone = True #~ rf.has_cross = True #~ rf.valid_characters = VALID_CHARS #~ rf.valid_name_length = self.NAME_LENGTH rf.valid_duplexes = ["", "-", "+", "split", "off"] #~ rf.valid_tmodes = ['', 'Tone', 'TSQL', 'DTCS', 'Cross'] #~ rf.valid_cross_modes = [ #~ "Tone->Tone", #~ "DTCS->", #~ "->DTCS", #~ "Tone->DTCS", #~ "DTCS->Tone", #~ "->Tone", #~ "DTCS->DTCS"] rf.valid_skips = SKIP_VALUES #~ rf.valid_dtcs_codes = DTCS rf.memory_bounds = (0, self._upper) # power levels POWER_LEVELS = self._power_levels rf.valid_power_levels = POWER_LEVELS # normal dual bands rf.valid_bands = [self._vhf_range, self._uhf_range] return rf def process_mmap(self): """Process the mem map into the mem object""" # Get it self._memobj = bitwise.parse(MEM_FORMAT, self._mmap) def sync_in(self): """Download from radio""" data = _download(self) self._mmap = memmap.MemoryMap(data) self.process_mmap() def sync_out(self): """Upload to radio""" #~ try: #~ _upload(self) #~ except errors.RadioError: #~ raise #~ except Exception, e: #~ raise errors.RadioError("Error: %s" % e) # upload disabled by now raise errors.RadioError("Error: This is a dev driver, no upload yet.") def get_raw_memory(self, number): return repr(self._memobj.memory[number]) def get_memory(self, number): """Get the mem representation from the radio image""" _mem = self._memobj.memory[number] # Create a high-level memory object to return to the UI mem = chirp_common.Memory() # Memory number mem.number = number if _mem.get_raw()[0] == "\xFF": mem.empty = True return mem # Freq and offset mem.freq = int(_mem.rxfreq) * 1000 # tx freq can be blank if _mem.get_raw()[8] == "\xFF": # TX freq not set mem.offset = 0 mem.duplex = "off" else: # TX freq set txfreq = int(_mem.txfreq) * 1000 offset = txfreq - mem.freq if offset != 0: if _split(self.get_features(), mem.freq, txfreq): mem.duplex = "split" mem.offset = txfreq elif offset < 0: mem.offset = abs(offset) mem.duplex = "-" elif offset > 0: mem.offset = offset mem.duplex = "+" else: mem.offset = 0 #~ # power #~ mem.power = POWER_LEVELS[int(_mem.power)] #~ # wide/narrow #~ mem.mode = MODES[int(_mem.wide)] #~ # skip #~ mem.skip = SKIP_VALUES[_mem.add] #~ # tone data #~ rxtone = txtone = None #~ txtone = self._decode_tone(_mem.txtone) #~ rxtone = self._decode_tone(_mem.rxtone) #~ chirp_common.split_tone_decode(mem, txtone, rxtone) return mem def set_memory(self, mem): """Set the memory data in the eeprom img from the UI""" # get the eprom representation of this channel _mem = self._memobj.memory[mem.number] _names = self._memobj.names[mem.number] # if empty memmory if mem.empty: # the channel itself _mem.set_raw("\xFF" * 16) # frequency _mem.rxfreq = mem.freq / 1000 # duplex if mem.duplex == "+": _mem.txfreq = (mem.freq + mem.offset) / 1000 elif mem.duplex == "-": _mem.txfreq = (mem.freq - mem.offset) / 1000 elif mem.duplex == "off": for i in _mem.txfreq: i.set_raw("\xFF") elif mem.duplex == "split": _mem.txfreq = mem.offset / 1000 else: _mem.txfreq = mem.freq / 1000 #~ # tone data #~ ((txmode, txtone, txpol), (rxmode, rxtone, rxpol)) = \ #~ chirp_common.split_tone_encode(mem) #~ self._encode_tone(_mem.txtone, txmode, txtone, txpol) #~ self._encode_tone(_mem.rxtone, rxmode, rxtone, rxpol) return mem @classmethod def match_model(cls, filedata, filename): match_size = False #~ match_model = False LOG.debug("len file/mem %i/%i" % (len(filedata), MEM_SIZE)) # testing the file data size if len(filedata) == MEM_SIZE: match_size = True # DEBUG if debug is True: LOG.debug("BF-T1 matched!") # testing the firmware model fingerprint #~ match_model = model_match(cls, filedata) if match_size: # and match_model: return True else: return False