Project

General

Profile

Bug #11303 » generic_csv.py

fea88606 - Dan Smith, 04/14/2024 05:46 PM

 
# Copyright 2008 Dan Smith <dsmith@danplanet.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
import csv
import logging

from chirp import chirp_common, errors, directory

LOG = logging.getLogger(__name__)
DEFAULT_POWER_LEVEL = chirp_common.AutoNamedPowerLevel(50)


class OmittedHeaderError(Exception):
"""Internal exception to signal that a column has been omitted"""
pass


def get_datum_by_header(headers, data, header):
"""Return the column corresponding to @headers[@header] from @data"""
if header not in headers:
raise OmittedHeaderError("Header %s not provided" % header)

try:
return data[headers.index(header)]
except IndexError:
raise OmittedHeaderError("Header %s not provided on this line" %
header)


def write_memory(writer, mem):
"""Write @mem using @writer if not empty"""
if mem.empty:
return
writer.writerow(mem.to_csv())


def parse_cross_mode(value):
if value not in chirp_common.CROSS_MODES:
raise ValueError('Invalid cross mode %r' % value)
return value


@directory.register
class CSVRadio(chirp_common.FileBackedRadio):
"""A driver for Generic CSV files"""
VENDOR = "Generic"
MODEL = "CSV"
FILE_EXTENSION = "csv"
FORMATS = [directory.register_format('CSV', '*.csv')]

ATTR_MAP = {
"Location": (int, "number"),
"Name": (str, "name"),
"Frequency": (chirp_common.parse_freq, "freq"),
"Duplex": (str, "duplex"),
"Offset": (chirp_common.parse_freq, "offset"),
"Tone": (str, "tmode"),
"rToneFreq": (float, "rtone"),
"cToneFreq": (float, "ctone"),
"DtcsCode": (int, "dtcs"),
"DtcsPolarity": (str, "dtcs_polarity"),
"RxDtcsCode": (int, "rx_dtcs"),
"CrossMode": (parse_cross_mode, "cross_mode"),
"Mode": (str, "mode"),
"TStep": (float, "tuning_step"),
"Skip": (str, "skip"),
"Power": (chirp_common.parse_power, "power"),
"Comment": (str, "comment"),
}

def _blank(self, setDefault=False):
self.errors = []
self.memories = [chirp_common.Memory(i, True) for i in range(0, 1000)]
if (setDefault):
self.memories[0].empty = False
self.memories[0].freq = 146010000
# Default to 50W
self.memories[0].power = DEFAULT_POWER_LEVEL

def __init__(self, pipe):
chirp_common.FileBackedRadio.__init__(self, None)
self.memories = []
self.file_has_rTone = None # Set in load(), used in _clean_tmode()
self.file_has_cTone = None

# Persistence for comment lines
# List of tuples of (previous_memory, comment)
self._comments = []

self._filename = pipe
if self._filename and os.path.exists(self._filename):
self.load()
else:
self._blank(True)

def get_features(self):
rf = chirp_common.RadioFeatures()
rf.has_bank = False
rf.requires_call_lists = False
rf.has_implicit_calls = False
rf.memory_bounds = (0, len(self.memories)-1)
rf.has_infinite_number = True
rf.has_nostep_tuning = True
rf.has_comment = True
rf.has_rx_dtcs = True
rf.has_variable_power = True
rf.can_odd_split = True

rf.valid_modes = list(chirp_common.MODES)
rf.valid_tmodes = list(chirp_common.TONE_MODES)
rf.valid_cross_modes = list(chirp_common.CROSS_MODES)
rf.valid_duplexes = ["", "-", "+", "split", "off"]
rf.valid_tuning_steps = list(chirp_common.TUNING_STEPS)
rf.valid_bands = [(1, 10000000000)]
rf.valid_skips = ["", "S"]
rf.valid_characters = chirp_common.CHARSET_1252
rf.valid_name_length = 999
rf.valid_power_levels = [chirp_common.AutoNamedPowerLevel(0.1),
DEFAULT_POWER_LEVEL,
chirp_common.AutoNamedPowerLevel(1500)]

return rf

def _clean(self, headers, line, mem):
"""Runs post-processing functions on new mem objects.

This is useful for parsing other CSV dialects when multiple columns
convert to a single Chirp column."""

for attr in dir(mem):
fname = "_clean_%s" % attr
if hasattr(self, fname):
mem = getattr(self, fname)(headers, line, mem)

return mem

def _clean_tmode(self, headers, line, mem):
""" If there is exactly one of [rToneFreq, cToneFreq] columns in the
csv file, use it for both rtone & ctone. Makes TSQL use friendlier."""

if self.file_has_rTone and not self.file_has_cTone:
mem.ctone = mem.rtone
elif self.file_has_cTone and not self.file_has_rTone:
mem.rtone = mem.ctone

return mem

def _parse_csv_data_line(self, headers, line):
mem = chirp_common.Memory()
try:
if get_datum_by_header(headers, line, "Mode") == "DV":
mem = chirp_common.DVMemory()
except OmittedHeaderError:
pass

for header in headers:
try:
typ, attr = self.ATTR_MAP[header]
except KeyError:
continue
try:
val = get_datum_by_header(headers, line, header)
if not val and typ == int:
val = None
else:
val = typ(val)
if hasattr(mem, attr):
setattr(mem, attr, val)
except OmittedHeaderError:
pass
except Exception as e:
raise Exception("[%s] %s" % (attr, e))

if not mem.power:
# Default power level to something if not set
mem.power = DEFAULT_POWER_LEVEL

return self._clean(headers, line, mem)

def load(self, filename=None):
if filename is None and self._filename is None:
raise errors.RadioError("Need a location to load from")

if filename:
self._filename = filename

self._blank()

with open(self._filename, newline='', encoding='utf-8-sig') as f:
return self._load(f)

def _load(self, f):
reader = csv.reader(f, delimiter=chirp_common.SEPCHAR, quotechar='"')

self._comments = []
good = 0
lineno = 0
last_number = -1
for line in reader:
# Skip (but stash) comment lines that start with #
if line and line[0].startswith('#'):
self._comments.append((last_number, ' '.join(line)))
continue
lineno += 1
if lineno == 1:
header = line
self.file_has_rTone = "rToneFreq" in header
self.file_has_cTone = "cToneFreq" in header
continue

if len(header) > len(line):
LOG.error("Line %i has %i columns, expected %i",
lineno, len(line), len(header))
self.errors.append("Column number mismatch on line %i" %
lineno)
continue

try:
mem = self._parse_csv_data_line(header, line)
if mem.number is None:
raise Exception("Invalid Location field" % lineno)
except Exception as e:
LOG.error("Line %i: %s", lineno, e)
self.errors.append("Line %i: %s" % (lineno, e))
continue

last_number = mem.number
self._grow(mem.number)
self.memories[mem.number] = mem
good += 1

if not good:
raise errors.InvalidDataError("No channels found")

def save(self, filename=None):
if filename is None and self._filename is None:
raise errors.RadioError("Need a location to save to")

if filename:
self._filename = filename

with open(self._filename, "w", newline='', encoding='utf-8') as f:
comments = list(self._comments)
writer = csv.writer(f, delimiter=chirp_common.SEPCHAR)

for index, comment in comments[:]:
if index >= 0:
break
writer.writerow([comment])
comments.pop(0)

writer.writerow(chirp_common.Memory.CSV_FORMAT)

for mem in self.memories:
for index, comment in comments[:]:
if index >= mem.number:
break
writer.writerow([comment])
comments.pop(0)
write_memory(writer, mem)

# MMAP compatibility
def save_mmap(self, filename):
return self.save(filename)

def load_mmap(self, filename):
return self.load(filename)

def get_memories(self, lo=0, hi=999):
return [x for x in self.memories if x.number >= lo and x.number <= hi]

def get_memory(self, number):
try:
return self.memories[number].dupe()
except:
raise errors.InvalidMemoryLocation("No such memory %s" % number)

def _grow(self, target):
delta = target - len(self.memories)
if delta < 0:
return

delta += 1

for i in range(len(self.memories), len(self.memories) + delta + 1):
mem = chirp_common.Memory()
mem.empty = True
mem.number = i
self.memories.append(mem)

def set_memory(self, newmem):
newmem = newmem.dupe()
if newmem.power is None:
newmem.power = DEFAULT_POWER_LEVEL
else:
# Accept any power level because we are CSV, but convert it to
# the class that will str() into our desired format.
newmem.power = chirp_common.AutoNamedPowerLevel(
chirp_common.dBm_to_watts(float(newmem.power)))
self._grow(newmem.number)
self.memories[newmem.number] = newmem
self.memories[newmem.number].name = newmem.name.rstrip()

def erase_memory(self, number):
mem = chirp_common.Memory()
mem.number = number
mem.empty = True
self.memories[number] = mem

def get_raw_memory(self, number):
return ",".join(chirp_common.Memory.CSV_FORMAT) + \
os.linesep + \
",".join(self.memories[number].to_csv())

@classmethod
def match_model(cls, filedata, filename):
"""Match files ending in .CSV"""
try:
filedata = filedata.decode()
except UnicodeDecodeError:
# CSV files are text
return False
return filename.lower().endswith("." + cls.FILE_EXTENSION) and \
(find_csv_header(filedata) or filedata == "")


def find_csv_header(filedata):
if filedata.startswith('\ufeff') or filedata.startswith('\ufffe'):
# Skip BOM
filedata = filedata[1:]
while filedata.startswith('#'):
filedata = filedata[filedata.find('\n') + 1:]
return filedata.startswith('Location,')


@directory.register
class CommanderCSVRadio(CSVRadio):
"""A driver for reading CSV files generated by KG-UV Commander software"""
VENDOR = "Commander"
MODEL = "KG-UV"
FILE_EXTENSION = "csv"

MODE_MAP = {
"NARR": "NFM",
"WIDE": "FM",
}

SCAN_MAP = {
"ON": "",
"OFF": "S"
}

ATTR_MAP = {
"#": (int, "number"),
"Name": (str, "name"),
"RX Freq": (chirp_common.parse_freq, "freq"),
"Scan": (lambda v: CommanderCSVRadio.SCAN_MAP.get(v), "skip"),
"TX Dev": (lambda v: CommanderCSVRadio.MODE_MAP.get(v), "mode"),
"Group/Notes": (str, "comment"),
}

def _clean_number(self, headers, line, mem):
if mem.number == 0:
for memory in self.memories:
if memory.empty:
mem.number = memory.number
break
return mem

def _clean_duplex(self, headers, line, mem):
try:
txfreq = chirp_common.parse_freq(
get_datum_by_header(headers, line, "TX Freq"))
except ValueError:
mem.duplex = "off"
return mem

if mem.freq == txfreq:
mem.duplex = ""
elif txfreq:
mem.duplex = "split"
mem.offset = txfreq

return mem

def _clean_tmode(self, headers, line, mem):
rtone = get_datum_by_header(headers, line, "Encode")
ctone = get_datum_by_header(headers, line, "Decode")
if rtone == "OFF":
rtone = None
else:
rtone = float(rtone)

if ctone == "OFF":
ctone = None
else:
ctone = float(ctone)

if rtone:
mem.tmode = "Tone"
if ctone:
mem.tmode = "TSQL"

mem.rtone = rtone or 88.5
mem.ctone = ctone or mem.rtone

return mem

@classmethod
def match_model(cls, filedata, filename):
"""Match files ending in .csv and using Commander column names."""
return filename.lower().endswith("." + cls.FILE_EXTENSION) and \
filedata.startswith(b"Name,RX Freq,TX Freq,Decode,Encode,TX Pwr,"
b"Scan,TX Dev,Busy Lck,Group/Notes") or \
filedata.startswith(b'"#","Name","RX Freq","TX Freq","Decode",'
b'"Encode","TX Pwr","Scan","TX Dev",'
b'"Busy Lck","Group/Notes"')


@directory.register
class RTCSVRadio(CSVRadio):
"""A driver for reading CSV files generated by RT Systems software"""
VENDOR = "RT Systems"
MODEL = "CSV"
FILE_EXTENSION = "csv"

DUPLEX_MAP = {
"Minus": "-",
"Plus": "+",
"Simplex": "",
"Split": "split",
}

SKIP_MAP = {
"Off": "",
"On": "S",
"P Scan": "P",
"Skip": "S",
}

TMODE_MAP = {
"None": "",
"T Sql": "TSQL",
}

BOOL_MAP = {
"Off": False,
"On": True,
}

ATTR_MAP = {
"Channel Number": (int, "number"),
"Receive Frequency": (chirp_common.parse_freq, "freq"),
"Offset Frequency": (chirp_common.parse_freq, "offset"),
"Offset Direction": (lambda v:
RTCSVRadio.DUPLEX_MAP.get(v, v), "duplex"),
"Operating Mode": (str, "mode"),
"Name": (str, "name"),
"Tone Mode": (lambda v:
RTCSVRadio.TMODE_MAP.get(v, v), "tmode"),
"CTCSS": (lambda v:
float(v.split(" ")[0]), "rtone"),
"DCS": (int, "dtcs"),
"Skip": (lambda v:
RTCSVRadio.SKIP_MAP.get(v, v), "skip"),
"Step": (lambda v:
float(v.split(" ")[0]), "tuning_step"),
"Mask": (lambda v:
RTCSVRadio.BOOL_MAP.get(v, v), "empty",),
"Comment": (str, "comment"),
}

def _clean_duplex(self, headers, line, mem):
if mem.duplex == "split":
try:
val = get_datum_by_header(headers, line, "Transmit Frequency")
val = chirp_common.parse_freq(val)
mem.offset = val
except OmittedHeaderError:
pass

return mem

def _clean_mode(self, headers, line, mem):
if mem.mode == "FM":
try:
val = get_datum_by_header(headers, line, "Half Dev")
if self.BOOL_MAP[val]:
mem.mode = "FMN"
except OmittedHeaderError:
pass

return mem

def _clean_ctone(self, headers, line, mem):
# RT Systems only stores a single tone value
mem.ctone = mem.rtone
return mem

@classmethod
def match_model(cls, filedata, filename):
"""Match files ending in .csv and using RT Systems column names."""
# RT Systems provides a different set of columns for each radio.
# We attempt to match only the first few columns, hoping they are
# consistent across radio models.
try:
filedata = filedata.decode()
except UnicodeDecodeError:
# CSV files are text
return False
return filename.lower().endswith("." + cls.FILE_EXTENSION) and \
filedata.startswith("Channel Number,Receive Frequency,"
"Transmit Frequency,Offset Frequency,"
"Offset Direction,Operating Mode,"
"Name,Tone Mode,CTCSS,DCS")
(3-3/3)