Source code for mtpy.utils.configfile

#!/usr/bin/env python

"""
Helper functions for the handling of configuration files
(survey.cfg  and BIRRP.cfg style).



@UofA, 2013
(LK)

"""

# =================================================================

import configparser
import copy
import io
import os
import os.path as op
import sys

import mtpy.utils.exceptions as MTex
import mtpy.utils.gis_tools as gis_tools

# =================================================================

list_of_required_keywords = [
    "latitude",
    "longitude",
    "elevation",
    "sampling_interval",
    "station_type",
]
list_of_required_keywords_short = ["lat", "lon", "elev", "sampling", "type"]

list_of_keyword_defaults_general = [0.0, 0.0, 0.0, 1.0, "mt"]


list_of_efield_keywords = [
    "E_logger_type",
    "E_logger_gain",
    "E_instrument_type",
    "E_instrument_amplification",
    "E_Xaxis_azimuth",
    "E_Xaxis_length",
    "E_Yaxis_azimuth",
    "E_Yaxis_length",
]

list_of_keyword_defaults_efield = [
    "edl",
    1,
    "electrodes",
    1.0,
    0.0,
    50.0,
    90.0,
    50.0,
]

list_of_bfield_keywords = [
    "B_logger_type",
    "B_logger_gain",
    "B_instrument_type",
    "B_instrument_amplification",
    "B_Xaxis_azimuth",
    "B_Yaxis_azimuth",
]


list_of_keyword_defaults_bfield = ["edl", 1, "coil", 1.0, 0.0, 90.0]


dict_of_allowed_values_efield = {
    "E_logger_type": ["edl", "elogger", "zen", "qel"],
    "E_logger_gain": [
        "low",
        "verylow",
        "high",
        0.4,
        1,
        10,
        11,
        2,
        4,
        8,
        16,
        32,
        64,
    ],
    "E_instrument_type": [
        "electrodes",
        "dipole",
        "cu-cuso4 electrodes",
        "cuso4_electrodes",
        "pbcl2_electrodes",
    ],
    "E_instrument_amplification": [1, 10, 11],
}

dict_of_allowed_values_bfield = {
    "B_logger_type": ["edl", "zen", "qel_blogger"],
    "B_logger_gain": [
        "low",
        "verylow",
        "high",
        0.4,
        1,
        10,
        2,
        4,
        8,
        16,
        32,
        64,
    ],
    "B_instrument_type": ["fluxgate", "coil", "coils"],
}

list_of_station_types = ["mt", "e", "b", "qe", "qb"]


# =================================================================


[docs] def read_configfile(filename): """Read a general config file and return the content as dictionary. Config files without sections or only DEFAULT section -> return dictionary Config files with sections -> return nested dictionary (main level keys are section heads) Config files with sections as well as section-less entries -> return nested dictionary, which includes a top level 'DEFAULT' key """ # check, if file is present if not op.isfile(filename): raise MTex.MTpyError_inputarguments("File does not exist: {0}".format(filename)) # try to parse file - exit, if not a config file try: # generate config parser instance configobject = configparser.SafeConfigParser() # do NOT ask, why it does not work with reading from filename directly...: with open(filename) as F: d = F.read() FH = io.StringIO(d) configobject.readfp(d) # filename) except: try: dummy_String = "[DEFAULT]\n" + open(filename, "r").read() FH = io.StringIO(dummy_String) # generate config parser instance configobject = configparser.SafeConfigParser() configobject.readfp(FH) except: raise MTex.MTpyError_inputarguments( "File is not a proper " "configuration file: {0}".format(filename) ) config_dict = configobject._sections if len(list(config_dict.keys())) != 0: defaults = configobject.defaults() if len(list(defaults.keys())) != 0: config_dict["DEFAULT"] = configobject.defaults() else: config_dict = configobject.defaults() return config_dict
[docs] def read_survey_configfile(filename): """Read in a survey configuration file and return a dictionary. Input config file must contain station names as section headers! The output dictionary keys are station names (capitalised), the values are (sub-)dictionaries. The configuration file must contain sections for all stations, each containing all mandatory keywords: - latitude (deg) - longitude (deg) - elevation (in meters) - sampling_interval (in seconds) - station_type (MT, (Q)E, (Q)B) Not mandatory, but recommended - declination (in degrees, positive to East) - this is set to '0.0', if omitted Depending on the type of station the following entries are required. E-field recorded: - E_logger_type ('edl'/'elogger'/'qel') - E_logger_gain (factor/gain-level) - E_instrument_type ('electrodes'/'dipole') - E_instrument_amplification (applied amplification factor) - E_Xaxis_azimuth (degrees) - E_Xaxis_length (in meters) - E_Yaxis_azimuth (degrees) - E_Yaxis_length (in meters) B-field recorded: - B_logger_type ('edl'/'qel_blogger') - B_logger_gain (factor/gain level) - B_instrument_type ('coil(s)', 'fluxgate') - B_instrument_amplification (applied amplification factor) - B_Xaxis_azimuth (degrees) - B_Yaxis_azimuth (degrees) A global section can be used to include parameters for all stations. The name of the section must be one of: global/main/default/general """ error_counter = 0 # generate config parser instance configobject = configparser.ConfigParser() # check, if file is present if not op.isfile(filename): raise MTex.MTpyError_inputarguments( "File does not" " exist: {0}".format(filename) ) # try to parse file - exit, if not a config file try: configobject.read(filename) except: raise MTex.MTpyError_inputarguments( "File is not a " "proper configuration file: {0}".format(filename) ) # obtain dict of dicts containing the input file's sections (station names) # excludes DEFAULT section and key-value pairs without section header configobject_dict = configobject._sections # initialise the output dictionary config_dict = {} # loop over the sections (stations) of the config file for station in configobject_dict: # read in the sub-dictionary for the current station - bringing all keys # to lowercase! temp_dict_in = dict( (k.lower(), v) for k, v in list(configobject_dict[station].items()) ) # initialise output sub-directory for current station stationdict = temp_dict_in # stationnames are uppercase in MTpy stationname = station if stationname in ["GLOBAL", "MAIN", "DEFAULT", "GENERAL"]: stationname = "GLOBAL" stationdict["station"] = stationname # add the station's sub-dictionary to the config dictionary config_dict[stationname] = stationdict # Check if a global section is present if "GLOBAL" in config_dict: globaldict = config_dict["GLOBAL"] else: # set defaults for location globaldict = {} # for i in ['latitude', 'longitude', 'elevation']: # #skip if values are present # if i in globaldict.keys() or i[:3] in globaldict.keys(): # continue # #otherwise set defaults # globaldict[i] = 0 # remove other general sections to avoid redundancy for i in ["MAIN", "DEFAULT", "GENERAL"]: if i in config_dict: dummy = config_dict.pop(i) # RE-loop to check for each station if required keywords are present, # if not if they can be pulled from the global section # ============================================================ # local function definition def fromglobals(key, stationdict, globaldict): """Check if stationdict contains key. If not search for key in global dict and add it to station dict. Return if global dict is not defined. Return True if key was present in either dictionary, False if not. """ if key in list(stationdict.keys()): return True, stationdict.get(key) if globaldict is None or len(globaldict) == 0: return False, None if key in globaldict: stationdict[key] = globaldict[key] return True, globaldict.get(key) return False, None # ============================================================ for station in sorted(config_dict): # do not alter the global section if station == "GLOBAL": continue stationdict = config_dict[station] # check for presence of all mandatory keywords for the current station # case insensitive - allow for short forms 'sampling', 'lat', 'lon', and 'elev' for idx, req_keyword in enumerate(list_of_required_keywords): shortform = list_of_required_keywords_short[idx] try: found = False # import ipdb # ipdb.set_trace() if fromglobals(req_keyword, stationdict, globaldict)[0] is False: # try short form instead found, value = fromglobals(shortform, stationdict, globaldict) # print shortform,value if found is True: stationdict[req_keyword] = value else: found = True if found is False: print( "Station {0} - keyword {1} missing".format( stationname, req_keyword ) ) error_counter += 1 raise Exception if req_keyword in ["elevation", "latitude", "longitude"]: # check format of lat/lon - convert to degrees, if given in # (deg,min,sec)-triple#assert correct format value = stationdict[req_keyword] try: if req_keyword in "latitude": new_value = gis_tools.assert_lat_value(value) elif req_keyword in "longitude": new_value = gis_tools.assert_lon_value(value) elif req_keyword in "elevation": new_value = gis_tools.assert_elevation_value(value) except: raise MTex.MTpyError_config_file( "Error - wrong " "coordinate format for station {0}".format(stationname) ) stationdict[req_keyword] = new_value except: raise print( "Missing information on station {0} in config file" " - setting default (dummy) value".format(station) ) stationdict[req_keyword] = list_of_keyword_defaults_general[idx] # to avoid duplicates remove the now obsolete short form from # the station dictionary dummy = stationdict.pop(shortform, None) if not stationdict["station_type"] in list_of_station_types: raise MTex.MTpyError_config_file("Station type not valid") if stationdict["station_type"] in ["mt", "e"]: # check for required electric field parameters - not done for QEL loggers yet for req_keyword in list_of_efield_keywords: if req_keyword.lower() in list(temp_dict_in.keys()): stationdict[req_keyword.lower()] = temp_dict_in[ req_keyword.lower() ].lower() else: print( "Station {0} - keyword {1} missing".format( stationname, req_keyword ) ) error_counter += 1 continue _validate_dictionary(stationdict, dict_of_allowed_values_efield) if stationdict["station_type"] in ["mt", "b"]: # check for required magnetic field parameters for req_keyword in list_of_bfield_keywords: if req_keyword.lower() in list(temp_dict_in.keys()): stationdict[req_keyword.lower()] = temp_dict_in[ req_keyword.lower() ].lower() else: print( "Station {0} - keyword {1} missing".format( stationname, req_keyword ) ) error_counter += 1 continue _validate_dictionary(stationdict, dict_of_allowed_values_bfield) # re-loop for setting up correct remote reference station information : # if rem.ref. station key is present, its information must be contained # in the same config file! for station in config_dict.keys(): stationdict = config_dict[station] if "rr_station" not in stationdict: continue # stationdict['rr_station'] = None stationdict["rr_latitude"] = None stationdict["rr_longitude"] = None stationdict["rr_elevation"] = None rem_station = stationdict["rr_station"] try: # check, if values are contained in dict float(stationdict["rr_latitude"]) float(stationdict["rr_longitude"]) float(stationdict["rr_elevation"]) except: try: # check for shortened form stationdict["rr_latitude"] = float(stationdict["rr_lat"]) stationdict["rr_longitude"] = float(stationdict["rr_lon"]) stationdict["rr_elevation"] = float(stationdict["rr_elev"]) except: try: # read from other config dict entry stationdict["rr_latitude"] = config_dict[rem_station]["latitude"] stationdict["rr_longitude"] = config_dict[rem_station]["longitude"] stationdict["rr_elevation"] = config_dict[rem_station]["elevation"] except: # if finally failed to read rr_station info,\ # set rr_station back to None stationdict["rr_station"] = None stationdict["rr_latitude"] = None stationdict["rr_longitude"] = None stationdict["rr_elevation"] = None # check consistency of coordinates, if rr_station is present if stationdict["rr_station"] != None: try: stationdict["rr_latitude"] = gis_tools.assert_lat_value( stationdict["rr_latitude"] ) stationdict["rr_longitude"] = gis_tools.assert_lon_value( stationdict["rr_longitude"] ) stationdict["rr_elevation"] = gis_tools.assert_elevation_value( stationdict["rr_elevation"] ) except: print("Problem with remote reference station ({0}) -") " remote reference ({1}) coordinates invalid -" " remote reference set to None".format( station, stationdict["rr_station"] ) stationdict["rr_station"] = None stationdict["rr_latitude"] = None stationdict["rr_longitude"] = None stationdict["rr_elevation"] = None if error_counter != 0: print( "Could not read all mandatory sections and options" " in config file - found {0} errors - check configuration" " file before continue!".format(error_counter) ) answer = 5 while not answer in ["y", "n"]: answer = input("\n\tDo you want to continue anyway? (y/n)") try: answer = answer.strip().lower()[0] except: continue if answer == "n": sys.exit() return config_dict
# =================================================================
[docs] def write_dict_to_configfile(dictionary, output_filename): """Write a dictionary into a configuration file. The dictionary can contain pure key-value pairs as well as a level-1 nested dictionary. In the first case, the entries are stored in a 'DEFAULT' section. In the latter case, the dictionary keys are taken as section heads and the sub-dictionaries key-value pairs fill up the respective section """ configobject = configparser.ConfigParser() # check for nested dictionary - # if the dict entry is a key-value pair, it's stored in a section with head 'DEFAULT' # otherwise, the dict key is taken as section header for key, val in sorted(dictionary.items()): try: for subkey, subval in sorted(val.items()): sectionhead = key if not configobject.has_section(sectionhead): configobject.add_section(sectionhead) configobject.set(sectionhead, subkey, str(subval)) except KeyError: # if not configobject.has_section('DEFAULT'): # configobject.add_section('') configobject.set("", key, str(val)) with open(output_filename, "w") as F: configobject.write(F)
# ================================================================= def _validate_dictionary(dict2validate, referencedict): """Check, if there are lists of allowed entries for all keys of the current dictionary. If yes, test, if the current value is among the allowed ones. """ for key, value in list(dict2validate.items()): # make everything to strings - easier to compare # in case of numbers, make to float first try: allowed_vals = referencedict[key] except: try: key = key.lower() allowed_vals = referencedict[key] except: # no reference entry found - skip key continue tmp = [] # allowed values must be given as a list (iterable)!! for i in allowed_vals: try: tmp.append(str(float(i))) except: tmp.append(str(i)) tmp = [i.lower() for i in tmp] allowed_vals = tmp # compare case-insensitive value = value.lower() if not value in allowed_vals: raise MTex.MTpyError_config_file( "Config file error --" " key {0}, value {1} not valid".format(key, value) ) # ==============================================================================
[docs] def read_survey_txt_file(survey_file, delimiter=None): """Read survey file and return a dictionary of dictionaries where the first nested dictionary is keyed by the station name. Each station dictionarly includes all the information input in the survey file with keywords verbatim as the headers in survey file, all lower case. *Must be included in survey file* ================= ========================================================= key word description ================= ========================================================= station station name lat(itude) latitude (decimal degrees is best) long(itude) longitude (decimal degrees is best) elev(ation) elevation (in meters) ex/E_Xaxis_length dipole length in north direction (in meters) ey/E_Yaxis_length dipole length in east direction (in meters) E_Xaxis_azimuth orientaion of Ex (degrees) E_Yaxis_azimuth orientaion of Ey (degrees) sampling_interval sampling interval in seconds hx coil number in north direction for calibration hy coil number in east direction for calibration hz coil number in vertical direction for calibration date date of deployment notes any notes that might help later station_type type of data collected (MT, E, B) declination declination in degrees (N = 0 and East = 90) ================= ========================================================= *Information on E-field data*: ========================== ================================================ key word description ========================== ================================================ E_logger_type type of data logger used to record data E_logger_gain factor/gain level E_instrument_type type of electrodes used E_instrument_amplification applied amplification factor E_Xaxis_azimuth orientaion of Ex (degrees) E_Xaxis_length length of dipole for Ex (in meters) E_Yaxis_azimuth orientaion of Ey (degrees) E_Yaxis_length length of dipole for Ey (in meters) ========================== ================================================ *Information on B-field data*: ========================== ================================================ key word description ========================== ================================================ B_logger_type type of data logger used to record data B_logger_gain factor/gain level B_instrument_type type of magnetometer used (coil, fluxgate) B_instrument_amplification applied amplification factor B_Xaxis_azimuth orientation of Bx (degrees) B_Yaxis_azimuth orientation of By (degrees) ================= ========================================================= Arguments: ----------- **survey_file** : string (full path to file) Outputs: --------- **survey_lst** : list list of dictionaries with key words the same as the headers in survey file, all lower case """ with open(survey_file, "r") as sfid: slines = sfid.readlines() slines = [i.replace('"', "") for i in slines] skeys = slines[0].rstrip() if delimiter is not None: skeys = skeys.split(delimiter) else: skeys = skeys.split() skeys = [i.strip().replace(" ", "_") for i in skeys] survey_dict = {} # print skeys, len(skeys) for ss, sline in enumerate(slines[1:]): sstr = sline.strip() if sstr[0] == "#": continue if delimiter is not None: sstr = sstr.split(delimiter) else: sstr = sstr.split() # print sstr # get rid of quotations sstr = [i.replace('"', "") for i in sstr] # get rid of spaces sstr = [i.replace(" ", "_") for i in sstr] # print sstr,len(sstr) if len(sstr) != len(skeys): print("cannot read line {0} - wrong number of entries - need {2}\ ".format(ss + 2, len(skeys))) continue sdict = {} # set default values for mandatory entries: sdict["E_Xaxis_azimuth"] = 0 sdict["E_Yaxis_azimuth"] = 90 sdict["B_Xaxis_azimuth"] = 0 sdict["B_Yaxis_azimuth"] = 90 sdict["station_type"] = "MT" sdict["declination"] = 0.0 sdict["sampling_interval"] = 0 sdict["E_instrument_amplification"] = 1.0 sdict["B_instrument_amplification"] = 1.0 sdict["E_logger_gain"] = 1.0 sdict["B_logger_gain"] = 1.0 sdict["B_instrument_type"] = "coil" sdict["E_instrument_type"] = "electrodes" sdict["E_logger_type"] = "edl" sdict["B_logger_type"] = "edl" # fill dictionary with given values for kk, skey in enumerate(skeys): # get rid of quotations skey.replace('"', "") # get rid of blank spaces in keys skey.replace(" ", "_") # do not include empty entries if len(sstr[kk]) > 0: # sstr[kk] = sstr[kk].replace('"','') sdict[skey.lower()] = sstr[kk] # print sorted(sdict) # print sdict['sampling_interval'] # sys.exit() # assigne values to the standard keys for key in list(sdict.keys()): if key.lower() in ["ex", "e_xaxis_length"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["E_Xaxis_length"] = val if key.lower() in ["ey", "e_yaxis_length"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["E_Yaxis_length"] = val if key.lower() == "station": sdict[key] = sdict[key].upper() if key.lower() in ["df", "sampling_rate", "sampling"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["sampling_interval"] = 1.0 / float(val) if key.lower() in ["dt", "sampling_interval"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["sampling_interval"] = float(val) if key.lower() == "dlgain": val = copy.copy(sdict[key]) sdict.pop(key) sdict["B_logger_gain"] = val sdict["E_logger_gain"] = val if key.lower() in ["b_logger_gain"]: sdict["B_logger_gain"] = sdict[key] if key.lower() in ["e_logger_gain"]: sdict["E_logger_gain"] = sdict[key] if key.lower() in ["e_logger_type"]: sdict["E_logger_type"] = sdict[key] if key.lower() in ["b_logger_type"]: sdict["B_logger_type"] = sdict[key] if key.lower() in ["egain", "e_instrument_amplification"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["E_instrument_amplification"] = val if key.lower() in ["bgain", "b_instrument_amplification"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["B_instrument_amplification"] = val if key.lower() in ["magtype", "b_instrument_type"]: if sdict[key].lower() in ["bb", "coil", "coils"]: sdict["B_instrument_type"] = "coil" if sdict[key].lower() in ["lp", "fluxgate", "fg"]: sdict["B_instrument_type"] = "fluxgate" sdict.pop(key) if key.lower() in ["e_instrument_type"]: if sdict[key].lower() in ["electrode", "electrodes"]: sdict["E_instrument_type"] = "electrodes" if (sdict[key].lower().find("lead") >= 0) or ( sdict[key].lower().find("pb") >= 0 ): sdict["E_instrument_type"] = "pbcl_electrodes" sdict.pop(key) if key.lower() in ["declination", "decl"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["declination"] = val if key.lower() in ["lat", "latitude"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["latitude"] = val if key.lower() in ["lon", "long", "longitude"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["longitude"] = val if key.lower() in ["ele", "elev", "elevation", "height"]: val = copy.copy(sdict[key]) sdict.pop(key) sdict["elevation"] = val try: survey_dict[sdict["station"]] = sdict except KeyError: try: survey_dict[sdict["station_name"]] = sdict except KeyError: survey_dict["MT{0:03}".format(ss)] = sdict return survey_dict
# ==============================================================================
[docs] def write_config_from_survey_txt_file(survey_file, save_name=None, delimiter="\t"): """Write a survey configuration file from a survey txt file . Arguments:: **survey_file** : string full path to survey text file. See read_survey_txt_file for the assumed header information. **save_name** : string name to save file to. If save_name = None, then file saved as os.path.join(os.path.dirname(survey_file, os.path.basename(survey_file).cfg) Outputs:: **cfg_fn** : string full path to saved config file """ survey_dict = read_survey_txt_file(survey_file, delimiter=delimiter) # get the filename to save to if save_name is None: save_dir = os.path.dirname(survey_file) save_fn = os.path.splitext(os.path.basename(survey_file))[0] + ".cfg" save_name = os.path.join(save_dir, save_fn) elif os.path.isfile(save_name): pass elif os.path.isdir(save_name): save_fn = os.path.splitext(os.path.basename(survey_file))[0] + ".cfg" save_name = os.path.join(save_name, save_fn) if not save_name.lower().endswith(".cfg"): save_name += ".cfg" # write the config file write_dict_to_configfile(survey_dict, save_name) return save_name