Source code for src.util_mdtf

"""Common functions and classes used in multiple places in the MDTF code. 
"""
import os
import io
import collections
import re
import glob
import shutil
import string
import tempfile
from src import util


[docs]class ConfigManager(util.Singleton):
[docs] def __init__(self, cli_obj=None, pod_info_tuple=None, unittest=False): assert cli_obj # Singleton, so init should only ever be called once # set up paths self.paths = _PathManager(cli_obj.config, cli_obj.code_root, unittest) # load pod info self.pods = pod_info_tuple.pod_data self.all_realms = pod_info_tuple.sorted_lists.get('realms', []) self.pod_realms = pod_info_tuple.realm_data self.global_envvars = dict() # copy over all config settings self.config = util.NameSpace.fromDict(cli_obj.config)
[docs]class _PathManager(util.NameSpace): """:class:`~util.Singleton` holding root paths for the MDTF code. These are set in the ``paths`` section of ``defaults.jsonc``. """
[docs] def __init__(self, d, code_root=None, unittest=False): self._unittest = unittest self.CODE_ROOT = code_root if not self._unittest: assert os.path.isdir(self.CODE_ROOT)
[docs] def parse(self, d, paths_to_parse=[], env=None): # set by CLI settings that have "parse_type": "path" in JSON entry if not paths_to_parse: print("Warning: didn't get list of paths from CLI.") for key in paths_to_parse: self[key] = self._init_path(key, d, env=env) if key in d: d[key] = self[key] # set following explictly: redundant, but keeps linter from complaining self.OBS_DATA_ROOT = self._init_path('OBS_DATA_ROOT', d, env=env) self.MODEL_DATA_ROOT = self._init_path('MODEL_DATA_ROOT', d, env=env) self.WORKING_DIR = self._init_path('WORKING_DIR', d, env=env) self.OUTPUT_DIR = self._init_path('OUTPUT_DIR', d, env=env) if not self.WORKING_DIR: self.WORKING_DIR = self.OUTPUT_DIR
[docs] def _init_path(self, key, d, env=None): if self._unittest: # use in unit testing only return 'TEST_'+key else: # need to check existence in case we're being called directly assert key in d, 'Error: {} not initialized.'.format(key) return util.resolve_path( util.coerce_from_iter(d[key]), root_path=self.CODE_ROOT, env=env )
[docs] def model_paths(self, case, overwrite=False): d = util.NameSpace() if isinstance(case, dict): name = case['CASENAME'] yr1 = case['FIRSTYR'] yr2 = case['LASTYR'] else: name = case.case_name yr1 = case.firstyr yr2 = case.lastyr case_wk_dir = 'MDTF_{}_{}_{}'.format(name, yr1, yr2) d.MODEL_DATA_DIR = os.path.join(self.MODEL_DATA_ROOT, name) d.MODEL_WK_DIR = os.path.join(self.WORKING_DIR, case_wk_dir) d.MODEL_OUT_DIR = os.path.join(self.OUTPUT_DIR, case_wk_dir) if not overwrite: # bump both WK_DIR and OUT_DIR to same version because name of # former may be preserved when we copy to latter, depending on # copy method d.MODEL_WK_DIR, ver = bump_version(d.MODEL_WK_DIR, extra_dirs=[self.OUTPUT_DIR]) d.MODEL_OUT_DIR, _ = bump_version(d.MODEL_OUT_DIR, new_v=ver) return d
[docs] def pod_paths(self, pod, case): d = util.NameSpace() d.POD_CODE_DIR = os.path.join(self.CODE_ROOT, 'diagnostics', pod.name) d.POD_OBS_DATA = os.path.join(self.OBS_DATA_ROOT, pod.name) d.POD_WK_DIR = os.path.join(case.MODEL_WK_DIR, pod.name) d.POD_OUT_DIR = os.path.join(case.MODEL_OUT_DIR, pod.name) return d
[docs]class TempDirManager(util.Singleton): _prefix = 'MDTF_temp_'
[docs] def __init__(self, temp_root=None): if not temp_root: temp_root = tempfile.gettempdir() assert os.path.isdir(temp_root) self._root = temp_root self._dirs = []
[docs] def make_tempdir(self, hash_obj=None): if hash_obj is None: new_dir = tempfile.mkdtemp(prefix=self._prefix, dir=self._root) elif isinstance(hash_obj, str): new_dir = os.path.join(self._root, self._prefix+hash_obj) else: # nicer-looking hash representation hash_ = hex(hash(hash_obj))[2:] assert isinstance(hash_, str) new_dir = os.path.join(self._root, self._prefix+hash_) if not os.path.isdir(new_dir): os.makedirs(new_dir) assert new_dir not in self._dirs self._dirs.append(new_dir) return new_dir
[docs] def rm_tempdir(self, path): assert path in self._dirs self._dirs.remove(path) print("\tDEBUG: cleanup temp dir {}".format(path)) shutil.rmtree(path)
[docs] def cleanup(self): for d in self._dirs: self.rm_tempdir(d)
[docs]class ConventionError(Exception): pass
[docs]class VariableTranslator(util.Singleton):
[docs] def __init__(self, unittest=False, verbose=0): if unittest: # value not used, when we're testing will mock out call to read_json # below with actual translation table to use for test config_files = ['dummy_filename'] else: config = ConfigManager() glob_pattern = os.path.join( config.paths.CODE_ROOT, 'src', 'fieldlist_*.jsonc' ) config_files = glob.glob(glob_pattern) # always have CF-compliant option, which does no translation self.axes = { 'CF': { "lon" : {"axis" : "X", "MDTF_envvar" : "lon_coord"}, "lat" : {"axis" : "Y", "MDTF_envvar" : "lat_coord"}, "lev" : {"axis" : "Z", "MDTF_envvar" : "lev_coord"}, "time" : {"axis" : "T", "MDTF_envvar" : "time_coord"} }} self.variables = {'CF': dict()} self.units = {'CF': dict()} for filename in config_files: d = util.read_json(filename) for conv in util.coerce_to_iter(d['convention_name']): if verbose > 0: print('XXX found ', conv) if conv in self.variables: print("ERROR: convention "+conv+" defined in "+filename+" already exists") raise ConventionError self.axes[conv] = d.get('axes', dict()) self.variables[conv] = util.MultiMap(d.get('var_names', dict())) self.units[conv] = util.MultiMap(d.get('units', dict()))
[docs] def toCF(self, convention, varname_in): if convention == 'CF': return varname_in assert convention in self.variables, \ "Variable name translation doesn't recognize {}.".format(convention) inv_lookup = self.variables[convention].inverse() try: return util.coerce_from_iter(inv_lookup[varname_in]) except KeyError: print("ERROR: name {} not defined for convention {}.".format( varname_in, convention)) raise
[docs] def fromCF(self, convention, varname_in): if convention == 'CF': return varname_in assert convention in self.variables, \ "Variable name translation doesn't recognize {}.".format(convention) try: return self.variables[convention].get_(varname_in) except KeyError: print("ERROR: name {} not defined for convention {}.".format( varname_in, convention)) raise
[docs]def get_available_programs(verbose=0): return {'py': 'python', 'ncl': 'ncl', 'R': 'Rscript'}
#return {'py': sys.executable, 'ncl': 'ncl'}
[docs]def setenv(varname,varvalue,env_dict,verbose=0,overwrite=True): """Wrapper to set environment variables. Args: varname (:obj:`str`): Variable name to define varvalue: Value to assign. Coerced to type :obj:`str` before being set. env_dict (:obj:`dict`): Copy of verbose (:obj:`int`, optional): Logging verbosity level. Default 0. overwrite (:obj:`bool`): If set to `False`, do not overwrite the values of previously-set variables. """ if (not overwrite) and (varname in env_dict): if (verbose > 0): print("Not overwriting ENV {}={}".format(varname,env_dict[varname])) else: if ('varname' in env_dict) \ and (env_dict[varname] != varvalue) and (verbose > 0): print("WARNING: setenv {}={} overriding previous setting {}".format( varname, varvalue, env_dict[varname] )) env_dict[varname] = varvalue # environment variables must be strings if isinstance(varvalue, bool): if varvalue == True: varvalue = '1' else: varvalue = '0' elif not isinstance(varvalue, str): varvalue = str(varvalue) os.environ[varname] = varvalue if (verbose > 0): print("ENV ",varname," = ",env_dict[varname]) if ( verbose > 2) : print("Check ",varname," ",env_dict[varname])
[docs]def check_required_envvar(*varlist): verbose=0 varlist = varlist[0] #unpack tuple for n in range(len(varlist)): if ( verbose > 2): print("checking envvar ", n, varlist[n], str(varlist[n])) try: _ = os.environ[varlist[n]] except: print("ERROR: Required environment variable {} not found.".format( varlist[n] )) exit()
[docs]def check_required_dirs(already_exist =[], create_if_nec = [], verbose=1): # arguments can be envvar name or just the paths filestr = __file__+":check_required_dirs: " errstr = "ERROR "+filestr if verbose > 1: filestr +" starting" for dir_in in already_exist + create_if_nec : if verbose > 1: "\t looking at "+dir_in if dir_in in os.environ: dir = os.environ[dir_in] else: if verbose>2: print(" envvar "+dir_in+" not defined") dir = dir_in if not os.path.exists(dir): if not dir_in in create_if_nec: if (verbose>0): print(errstr+dir_in+" = "+dir+" directory does not exist") raise OSError(dir+" directory does not exist") else: print(dir+" created") os.makedirs(dir) else: print("Found "+dir)
[docs]def bump_version(path, new_v=None, extra_dirs=[]): # return a filename that doesn't conflict with existing files. # if extra_dirs supplied, make sure path doesn't conflict with pre-existing # files at those locations either. def _split_version(file_): match = re.match(r""" ^(?P<file_base>.*?) # arbitrary characters (lazy match) (\.v(?P<version>\d+)) # literal '.v' followed by digits ? # previous group may occur 0 or 1 times $ # end of string """, file_, re.VERBOSE) if match: return (match.group('file_base'), match.group('version')) else: return (file_, '') def _reassemble(dir_, file_, version, ext_, final_sep): if version: file_ = ''.join([file_, '.v', str(version), ext_]) else: # get here for version == 0, '' or None file_ = ''.join([file_, ext_]) return os.path.join(dir_, file_) + final_sep def _path_exists(dir_list, file_, new_v, ext_, sep): new_paths = [_reassemble(d, file_, new_v, ext_, sep) for d in dir_list] return any([os.path.exists(p) for p in new_paths]) if path.endswith(os.sep): # remove any terminating slash on directory path = path.rstrip(os.sep) final_sep = os.sep else: final_sep = '' dir_, file_ = os.path.split(path) dir_list = util.coerce_to_iter(extra_dirs) dir_list.append(dir_) file_, old_v = _split_version(file_) if not old_v: # maybe it has an extension and then a version number file_, ext_ = os.path.splitext(file_) file_, old_v = _split_version(file_) else: ext_ = '' if new_v is not None: # removes version if new_v ==0 new_path = _reassemble(dir_, file_, new_v, ext_, final_sep) else: if not old_v: new_v = 0 else: new_v = int(old_v) while _path_exists(dir_list, file_, new_v, ext_, final_sep): new_v = new_v + 1 new_path = _reassemble(dir_, file_, new_v, ext_, final_sep) return (new_path, new_v)
[docs]class _DoubleBraceTemplate(string.Template): """Private class used by :func:`~util_mdtf.append_html_template` to do string templating with double curly brackets as delimiters, since single brackets are also used in css. See `https://docs.python.org/3.7/library/string.html#string.Template`_ and `https://stackoverflow.com/a/34362892`__. """ flags = re.VERBOSE # matching is case-sensitive, unlike default delimiter = '{{' # starting delimter is two braces, then apply pattern = r""" \{\{(?: # match delimiter itself, but don't include it # Alternatives for what to do with string following delimiter: # case 1) text is an escaped double bracket, written as '{{{{'. (?P<escaped>\{\{)| # case 2) text is the name of an env var, possibly followed by whitespace, # followed by closing double bracket. Match POSIX env var names, # case-sensitive (see https://stackoverflow.com/a/2821183), with the # addition that hyphens are allowed. # Can't tell from docs what the distinction between <named> and <braced> is. \s*(?P<named>[a-zA-Z_][a-zA-Z0-9_-]*)\s*\}\}| \s*(?P<braced>[a-zA-Z_][a-zA-Z0-9_-]*)\s*\}\}| # case 3) none of the above: ignore & move on (when using safe_substitute) (?P<invalid>) ) """
[docs]def append_html_template(template_file, target_file, template_dict={}, create=True, append=True): """Perform subtitutions on template_file and write result to target_file. Variable substitutions are done with custom `templating <https://docs.python.org/3.7/library/string.html#template-strings>`__, replacing *double* curly bracket-delimited keys with their values in template_dict. For example, if template_dict is {'A': 'foo'}, all occurrences of the string `{{A}}` in template_file are replaced with the string `foo`. Spaces between the braces and variable names are ignored. Double-curly-bracketed strings that don't correspond to keys in template_dict are ignored (instead of raising a KeyError.) Double curly brackets are chosen as the delimiter to match the default syntax of, eg, django and jinja2. Using single curly braces leads to conflicts with CSS syntax. Args: template_file: Path to template file. target_file: Destination path for result. template_dict: :py:obj:`dict` of variable name-value pairs. Both names and values must be strings. create: Boolean, default True. If true, create target_file if it doesn't exist, otherwise raise an OSError. append: Boolean, default True. If target_file exists and this is true, append the substituted contents of template_file to it. If false, overwrite target_file with the substituted contents of template_file. """ assert os.path.exists(template_file) with io.open(template_file, 'r', encoding='utf-8') as f: html_str = f.read() html_str = _DoubleBraceTemplate(html_str).safe_substitute(template_dict) if not os.path.exists(target_file): if create: # print("\tDEBUG: write {} to new {}".format(template_file, target_file)) mode = 'w' else: raise OSError("Can't find {}".format(target_file)) else: if append: # print("\tDEBUG: append {} to {}".format(template_file, target_file)) mode = 'a' else: # print("\tDEBUG: overwrite {} with {}".format(target_file, template_file)) os.remove(target_file) mode = 'w' with io.open(target_file, mode, encoding='utf-8') as f: f.write(html_str)