from __future__ import absolute_import, division, print_function, unicode_literals
import os
import io
from src import six
import re
import shutil
if os.name == 'posix' and six.PY2:
try:
import subprocess32 as subprocess
except ImportError:
import subprocess
else:
import subprocess
from collections import defaultdict, namedtuple
from itertools import chain
from operator import attrgetter, itemgetter
from abc import ABCMeta, abstractmethod, abstractproperty
from src import datelabel
from src import util
from src import util_mdtf
import src.conflict_resolution as choose
from src import cmip6
from src.data_manager import DataSet, DataManager, DataAccessError
from src.environment_manager import VirtualenvEnvironmentManager, CondaEnvironmentManager
from src.shared_diagnostic import Diagnostic, PodRequirementFailure
from src.netcdf_helper import NcoNetcdfHelper # only option currently implemented
[docs]class ModuleManager(util.Singleton):
_current_module_versions = {
'python2': 'python/2.7.12',
# most recent version common to analysis and workstations; use conda anyway
'python3': 'python/3.4.3',
'ncl': 'ncarg/6.5.0',
'r': 'R/3.4.4',
'anaconda': 'anaconda2/5.1',
'gcp': 'gcp/2.3',
# install nco in conda environment, rather than using GFDL module
# 'nco': 'nco/4.5.4', # 4.7.6 still broken on workstations
'netcdf': 'netcdf/4.2'
}
[docs] def __init__(self):
if 'MODULESHOME' not in os.environ:
# could set from module --version
raise OSError(("Unable to determine how modules are handled "
"on this host."))
_ = os.environ.setdefault('LOADEDMODULES', '')
# capture the modules the user has already loaded once, when we start up,
# so that we can restore back to this state in revert_state()
self.user_modules = set(self._list())
self.modules_i_loaded = set()
[docs] def _module(self, *args):
# based on $MODULESHOME/init/python.py
if isinstance(args[0], list): # if we're passed explicit list, unpack it
args = args[0]
cmd = '{}/bin/modulecmd'.format(os.environ['MODULESHOME'])
proc = subprocess.Popen([cmd, 'python'] + args, stdout=subprocess.PIPE)
(output, error) = proc.communicate()
if proc.returncode != 0:
raise subprocess.CalledProcessError(
returncode=proc.returncode,
cmd=' '.join([cmd, 'python'] + args), output=error)
exec(output)
[docs] def _parse_names(self, *module_names):
return [m if ('/' in m) else self._current_module_versions[m] \
for m in module_names]
[docs] def load(self, *module_names):
"""Wrapper for module load.
"""
mod_names = self._parse_names(*module_names)
for mod_name in mod_names:
if mod_name not in self.modules_i_loaded:
self.modules_i_loaded.add(mod_name)
self._module(['load', mod_name])
[docs] def load_commands(self, *module_names):
return ['module load {}'.format(m) \
for m in self._parse_names(*module_names)]
[docs] def unload(self, *module_names):
"""Wrapper for module unload.
"""
mod_names = self._parse_names(*module_names)
for mod_name in mod_names:
if mod_name in self.modules_i_loaded:
self.modules_i_loaded.discard(mod_name)
self._module(['unload', mod_name])
[docs] def unload_commands(self, *module_names):
return ['module unload {}'.format(m) \
for m in self._parse_names(*module_names)]
[docs] def _list(self):
"""Wrapper for module list.
"""
return os.environ['LOADEDMODULES'].split(':')
[docs] def revert_state(self):
mods_to_unload = self.modules_i_loaded.difference(self.user_modules)
for mod in mods_to_unload:
self._module(['unload', mod])
# User's modules may have been unloaded if we loaded a different version
for mod in self.user_modules:
self._module(['load', mod])
assert set(self._list()) == self.user_modules
[docs]class GfdlDiagnostic(Diagnostic):
"""Wrapper for Diagnostic that adds writing a placeholder directory to the
output as a lockfile if we're running in frepp cooperative mode.
"""
[docs] def __init__(self, pod_name, verbose=0):
super(GfdlDiagnostic, self).__init__(pod_name, verbose)
self._has_placeholder = False
[docs] def setUp(self, verbose=0):
config = util_mdtf.ConfigManager()
try:
super(GfdlDiagnostic, self).setUp(verbose)
make_remote_dir(
self.POD_OUT_DIR,
timeout=config.config.get('file_transfer_timeout', 0),
dry_run=config.config.get('dry_run', False)
)
self._has_placeholder = True
except PodRequirementFailure:
raise
[docs] def tearDown(self, verbose=0):
# only run teardown (including logging error on index.html) if POD ran
if self._has_placeholder:
super(GfdlDiagnostic, self).tearDown(verbose)
[docs]class GfdlvirtualenvEnvironmentManager(VirtualenvEnvironmentManager):
# Use module files to switch execution environments, as defined on
# GFDL workstations and PP/AN cluster.
[docs] def __init__(self, verbose=0):
_ = ModuleManager()
super(GfdlvirtualenvEnvironmentManager, self).__init__(verbose)
# manual-coded logic like this is not scalable
[docs] def set_pod_env(self, pod):
langs = [s.lower() for s in pod.runtime_requirements]
if pod.name == 'convective_transition_diag':
pod.env = 'py_convective_transition_diag'
elif pod.name == 'MJO_suite':
pod.env = 'ncl_MJO_suite'
elif ('r' in langs) or ('rscript' in langs):
pod.env = 'r_default'
elif 'ncl' in langs:
pod.env = 'ncl'
else:
pod.env = 'py_default'
# this is totally not scalable
_module_lookup = {
'ncl': ['ncl'],
'r_default': ['r'],
'py_default': ['python'],
'py_convective_transition_diag': ['python', 'ncl'],
'ncl_MJO_suite': ['python', 'ncl']
}
[docs] def create_environment(self, env_name):
modMgr = ModuleManager()
modMgr.load(self._module_lookup[env_name])
super(GfdlvirtualenvEnvironmentManager, \
self).create_environment(env_name)
[docs] def activate_env_commands(self, env_name):
modMgr = ModuleManager()
mod_list = modMgr.load_commands(self._module_lookup[env_name])
return ['source $MODULESHOME/init/bash'] \
+ mod_list \
+ super(GfdlvirtualenvEnvironmentManager, self).activate_env_commands(env_name)
[docs] def deactivate_env_commands(self, env_name):
modMgr = ModuleManager()
mod_list = modMgr.unload_commands(self._module_lookup[env_name])
return super(GfdlvirtualenvEnvironmentManager, \
self).deactivate_env_commands(env_name) + mod_list
[docs] def tearDown(self):
super(GfdlvirtualenvEnvironmentManager, self).tearDown()
modMgr = ModuleManager()
modMgr.revert_state()
[docs]class GfdlcondaEnvironmentManager(CondaEnvironmentManager):
# Use mdteam's anaconda2
[docs] def _call_conda_create(self, env_name):
raise Exception(("Trying to create conda env {} "
"in read-only mdteam account.").format(env_name)
)
[docs]def GfdlautoDataManager(case_dict, DateFreqMixin=None):
"""Wrapper for dispatching DataManager based on inputs.
"""
test_root = case_dict.get('CASE_ROOT_DIR', None)
if not test_root:
return Gfdludacmip6DataManager(case_dict, DateFreqMixin)
test_root = os.path.normpath(test_root)
if 'pp' in os.path.basename(test_root):
return GfdlppDataManager(case_dict, DateFreqMixin)
else:
print(("ERROR: Couldn't determine data fetch method from input."
"Please set '--data_manager GFDL_pp', 'GFDL_UDA_CMP6', or "
"'GFDL_data_cmip6', depending on the source you want."))
exit()
[docs]class GfdlarchiveDataManager(six.with_metaclass(ABCMeta, DataManager)):
[docs] def __init__(self, case_dict, DateFreqMixin=None):
# load required modules
modMgr = ModuleManager()
modMgr.load('gcp') # should refactor
config = util_mdtf.ConfigManager()
config.config.netcdf_helper = 'NcoNetcdfHelper' # HACK for now
super(GfdlarchiveDataManager, self).__init__(case_dict, DateFreqMixin)
assert ('CASE_ROOT_DIR' in case_dict)
if not os.path.isdir(case_dict['CASE_ROOT_DIR']):
raise DataAccessError(None,
"Can't access CASE_ROOT_DIR = '{}'".format(case_dict['CASE_ROOT_DIR']))
self.root_dir = case_dict['CASE_ROOT_DIR']
self.tape_filesystem = is_on_tape_filesystem(self.root_dir)
self.frepp_mode = config.config.get('frepp', False)
if self.frepp_mode:
self.overwrite = True
# flag to not overwrite config and .tar: want overwrite for frepp
self.file_overwrite = True
# if overwrite=False, WK_DIR & OUT_DIR will have been set to a
# unique name in parent's init. Set it back so it will be overwritten.
d = config.paths.modelPaths(self, overwrite=True)
self.MODEL_WK_DIR = d.MODEL_WK_DIR
self.MODEL_OUT_DIR = d.MODEL_OUT_DIR
DataKey = namedtuple('DataKey', ['name_in_model', 'date_freq'])
[docs] def dataset_key(self, dataset):
return self.DataKey(
name_in_model=dataset.name_in_model,
date_freq=str(dataset.date_freq)
)
[docs] @abstractmethod
def undecided_key(self, dataset):
pass
[docs] @abstractmethod
def parse_relative_path(self, subdir, filename):
pass
[docs] def _listdir(self, dir_):
# print("\t\tDEBUG: listdir on ...{}".format(dir_[len(self.root_dir):]))
return os.listdir(dir_)
[docs] def _list_filtered_subdirs(self, dirs_in, subdir_filter=None):
subdir_filter = util.coerce_to_iter(subdir_filter)
found_dirs = []
for dir_ in dirs_in:
found_subdirs = {d for d \
in self._listdir(os.path.join(self.root_dir, dir_)) \
if not (d.startswith('.') or d.endswith('.nc'))
}
if subdir_filter:
found_subdirs = found_subdirs.intersection(subdir_filter)
if not found_subdirs:
print("\tCouldn't find subdirs (in {}) at {}, skipping".format(
subdir_filter, os.path.join(self.root_dir, dir_)
))
continue
found_dirs.extend([
os.path.join(dir_, subdir_) for subdir_ in found_subdirs \
if os.path.isdir(os.path.join(self.root_dir, dir_, subdir_))
])
return found_dirs
[docs] @abstractmethod
def subdirectory_filters(self):
pass
[docs] def _query_data(self):
"""XXX UPDATE DOCSTRING
Populate _remote_data attribute with list of candidate files.
Specifically, if a <component> and <chunk_freq> subdirectory has all the
requested data, return paths to all files we *would* need in that
subdirectory. The decision of which <component> and <chunk_freq> to use
is made in :meth:`~gfdl.GfdlppDataManager.plan_data_fetching`
because it requires comparing the files found for *all* requested datasets.
"""
self._component_map = defaultdict(list)
# match files ending in .nc only if they aren't of the form .tile#.nc
# (negative lookback)
regex_no_tiles = re.compile(r".*(?<!\.tile\d)\.nc$")
pathlist = ['']
for filter_ in self.subdirectory_filters():
pathlist = self._list_filtered_subdirs(pathlist, filter_)
for dir_ in pathlist:
file_lookup = defaultdict(list)
dir_contents = self._listdir(os.path.join(self.root_dir, dir_))
dir_contents = list(filter(regex_no_tiles.search, dir_contents))
files = []
for f in dir_contents:
try:
files.append(self.parse_relative_path(dir_, f))
except ValueError as exc:
print('\tDEBUG:', exc)
#print('\t\tDEBUG: ', exc, '\n\t\t', os.path.join(self.root_dir, dir_), f)
continue
for ds in files:
data_key = self.dataset_key(ds)
file_lookup[data_key].append(ds)
for data_key in self.data_keys:
if data_key not in file_lookup:
continue
try:
# method throws ValueError if ranges aren't contiguous
files_date_range = datelabel.DateRange.from_contiguous_span(
*[f.date_range for f in file_lookup[data_key]]
)
except ValueError:
# Date range of remote files doesn't contain analysis range or
# is noncontiguous; should probably log an error
continue
if not files_date_range.contains(self.date_range):
# should log warning
continue
for ds in file_lookup[data_key]:
if ds.date_range in self.date_range:
d_key = self.dataset_key(ds)
assert data_key == d_key
u_key = self.undecided_key(ds)
self.data_files[data_key].update([u_key])
self._component_map[u_key, data_key].append(ds)
[docs] def query_dataset(self, dataset):
# all the work done by _query_data
pass
[docs] @abstractmethod
def _decide_allowed_components(self):
pass
[docs] def plan_data_fetch_hook(self):
"""Filter files on model component and chunk frequency.
"""
d_to_u_dict = self._decide_allowed_components()
for data_key in self.data_keys:
u_key = d_to_u_dict[data_key]
print("Selected {} for {} @ {}".format(
u_key, data_key.name_in_model, data_key.date_freq)
)
# check we didn't eliminate everything:
assert self._component_map[u_key, data_key]
self.data_files[data_key] = self._component_map[u_key, data_key]
paths = set()
for data_key in self.data_keys:
for f in self.data_files[data_key]:
paths.add(f._remote_data)
if self.tape_filesystem:
print("start dmget of {} files".format(len(paths)))
util.run_command(['dmget','-t','-v'] + list(paths),
timeout= len(paths) * self.file_transfer_timeout,
dry_run=self.dry_run
)
print("end dmget")
[docs] def local_data_is_current(self, dataset):
"""Test whether data is current based on filesystem modification dates.
TODO:
- Throw an error if local copy has been modified after remote copy.
- Handle case where local data involves processing of remote data, like
ncrcat'ing. Copy raw remote files to temp directory if we need to
process?
- gcp --sync does this already.
"""
return False
# return os.path.getmtime(dataset._local_data) \
# >= os.path.getmtime(dataset._remote_data)
[docs] def remote_data_list(self):
"""Process list of requested data to make data fetching efficient.
"""
return sorted(list(self.data_keys))
[docs] def _fetch_exception_handler(self, exc):
print(exc)
# iterating over the keys themselves, so that will be what's passed
# in the exception
for pod in self.data_pods[exc.dataset]:
print("\tSkipping pod {} due to data fetch error.".format(pod.name))
pod.skipped = exc
[docs] def fetch_dataset(self, d_key, method='auto'):
"""Copy files to temporary directory and combine chunks.
"""
# pylint: disable=maybe-no-member
(cp_command, smartsite) = self._determine_fetch_method(method)
dest_path = self.local_path(d_key)
dest_dir = os.path.dirname(dest_path)
# ncrcat will error instead of creating destination directories
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
# GCP can't copy to home dir, so always copy to temp
tmpdirs = util_mdtf.TempDirManager()
work_dir = tmpdirs.make_tempdir(hash_obj = d_key)
remote_files = sorted( # cast from set to list so we can go in chrono order
list(self.data_files[d_key]), key=lambda ds: ds.date_range.start
)
# copy remote files
# TODO: Do something intelligent with logging, caught OSErrors
for f in remote_files:
print("\tcopying ...{} to {}".format(
f._remote_data[len(self.root_dir):], work_dir
))
util.run_command(cp_command + [
smartsite + f._remote_data,
# gcp requires trailing slash, ln ignores it
smartsite + work_dir + os.sep
],
timeout=self.file_transfer_timeout,
dry_run=self.dry_run
)
# ----------------------------------------
# Processing of copied files: TODO: refactor individual steps into
# separate functions
# set axis names from header info
# only look at first file; if other chunks for same var differ, NCO will
# raise error when we try to concat them
file_name = os.path.basename(remote_files[0]._remote_data)
var_name = remote_files[0].name_in_model
file_axes = self.nc_get_axes_attributes(
var_name,
in_file=file_name, cwd=work_dir, dry_run=self.dry_run
)
for fax, fax_attrs in iter(file_axes.items()):
# update DataSets with axis info - need to loop since multiple PODs
# may reference this file (warning will be repeated; TODO fix that)
error_flag = 0
for var in self.data_keys[d_key]:
if fax in var.axes:
# file's axis in list of case's axis names; check their
# axis attributes match if they're both defined
if 'axis' in fax_attrs and 'axis' in var.axes[fax] \
and fax_attrs['axis'].lower() != var.axes[fax]['axis'].lower() \
and error_flag != 1:
print(("\tWarning: unexpected axis attribute for {0} in "
"{1} (found {2}, {3} convention is {4})").format(
fax, file_name, fax_attrs['axis'],
self.convention, var.axes[fax]['axis']
))
error_flag = 1
var.axes[fax]['MDTF_set_from_axis'] = False
else:
# file has different axis name, try to match by attribute
for vax, vax_attrs in iter(var.axes.items()):
if 'axis' not in fax_attrs or 'axis' not in vax_attrs:
continue
elif vax_attrs['axis'].lower() == fax_attrs['axis'].lower():
# matched axis attributes: log warning & reassign
if error_flag != 2:
print(("\tWarning: unexpected {0} axis name in {1} "
"(found {2}, {3} convention is {4})").format(
fax_attrs['axis'], file_name, fax,
self.convention, vax
))
error_flag = 2
# only update so we don't overwrite the envvar name
var.axes[fax] = vax_attrs.copy()
var.axes[fax].update(fax_attrs)
var.axes[fax]['MDTF_set_from_axis'] = True
del var.axes[vax]
break
else:
# get here if we didn't hit 'break' above -- give up
if error_flag != 3:
print(("\tWarning: unable to assign {0} axis "
"in {1}.").format(fax, file_name))
error_flag = 3
# crop time axis to requested range
# do this *before* combining chunks to reduce disk activity
for vax, vax_attrs in iter(var.axes.items()):
if 'axis' not in vax_attrs or vax_attrs['axis'].lower() != 't':
continue
else:
time_var_name = vax
break
else:
print("\tCan't determine time axis for {}.".format(file_name))
time_var_name = 'time' # will probably give KeyError
trim_count = 0
for f in remote_files:
file_name = os.path.basename(f._remote_data)
if not self.date_range.overlaps(f.date_range):
print(("\tWarning: {} has dates {} outside of requested "
"range {}.").format(file_name, f.date_range, self.date_range))
continue
if not self.date_range.contains(f.date_range):
# file overlaps analysis range but is not strictly contained
# in it means we need to trim either start or end or both
trimmed_range = f.date_range.intersection(
self.date_range,
precision=f.date_range.precision
)
print("\ttrimming '{}' of {} from {} to {}".format(
time_var_name, file_name, f.date_range, trimmed_range))
trim_count = trim_count + 1
self.nc_crop_time_axis(
time_var_name, trimmed_range,
in_file=file_name, cwd=work_dir, dry_run=self.dry_run
)
if trim_count > 2:
print("trimmed {} files!".format(trim_count))
raise AssertionError()
# cat chunks to destination, if more than one
if len(remote_files) > 1:
# not running in shell, so can't use glob expansion.
print("\tcatting {} chunks to {}".format(
d_key.name_in_model, dest_path
))
chunks = [os.path.basename(f._remote_data) for f in remote_files]
self.nc_cat_chunks(chunks, dest_path,
cwd=work_dir, dry_run=self.dry_run
)
else:
f = util.coerce_from_iter(remote_files)
file_name = os.path.basename(f._remote_data)
print("\tsymlinking {} to {}".format(d_key.name_in_model, dest_path))
util.run_command(['ln', '-fs', \
os.path.join(work_dir, file_name), dest_path],
dry_run=self.dry_run
)
# temp files cleaned up by data_manager.tearDown
[docs] def _determine_fetch_method(self, method='auto'):
_methods = {
'gcp': {'command': ['gcp', '--sync', '-v', '-cd'], 'site':'gfdl:'},
'cp': {'command': ['cp'], 'site':''},
'ln': {'command': ['ln', '-fs'], 'site':''}
}
if method not in _methods:
if self.tape_filesystem:
method = 'gcp' # use GCP for DMF filesystems
else:
method = 'ln' # symlink for local files
return (_methods[method]['command'], _methods[method]['site'])
[docs] def process_fetched_data_hook(self):
pass
[docs] def _make_html(self, cleanup=False):
# never cleanup html if we're in frepp_mode, since framework may run
# later when another component finishes. Instead just append current
# progress to TEMP_HTML.
prev_html = os.path.join(self.MODEL_OUT_DIR, 'index.html')
if self.frepp_mode and os.path.exists(prev_html):
print("\tDEBUG: Appending previous index.html at {}".format(prev_html))
with io.open(prev_html, 'r', encoding='utf-8') as f1:
contents = f1.read()
contents = contents.split('<!--CUT-->')
assert len(contents) == 3
contents = contents[1]
if os.path.exists(self.TEMP_HTML):
mode = 'a'
else:
print("\tWARNING: No file at {}.".format(self.TEMP_HTML))
mode = 'w'
with io.open(self.TEMP_HTML, mode, encoding='utf-8') as f2:
f2.write(contents)
super(GfdlarchiveDataManager, self)._make_html(
cleanup=(not self.frepp_mode)
)
[docs] def _make_tar_file(self, tar_dest_dir):
# make locally in WORKING_DIR and gcp to destination,
# since OUTPUT_DIR might be mounted read-only
config = util_mdtf.ConfigManager()
out_file = super(GfdlarchiveDataManager, self)._make_tar_file(
config.paths.WORKING_DIR
)
gcp_wrapper(
out_file, tar_dest_dir,
timeout=self.file_transfer_timeout, dry_run=self.dry_run
)
_, file_ = os.path.split(out_file)
return os.path.join(tar_dest_dir, file_)
[docs] def _copy_to_output(self):
# use gcp, since OUTPUT_DIR might be mounted read-only
if self.MODEL_WK_DIR == self.MODEL_OUT_DIR:
return # no copying needed
if self.frepp_mode:
# only copy PODs that ran, whether they succeeded or not
for pod in self.pods:
if pod._has_placeholder:
gcp_wrapper(
pod.POD_WK_DIR,
pod.POD_OUT_DIR,
timeout=self.file_transfer_timeout, dry_run=self.dry_run
)
# copy all case-level files
print("\tDEBUG: files in {}".format(self.MODEL_WK_DIR))
for f in os.listdir(self.MODEL_WK_DIR):
print("\t\tDEBUG: found {}".format(f))
if os.path.isfile(os.path.join(self.MODEL_WK_DIR, f)):
print("\t\tDEBUG: found {}".format(f))
gcp_wrapper(
os.path.join(self.MODEL_WK_DIR, f),
self.MODEL_OUT_DIR,
timeout=self.file_transfer_timeout, dry_run=self.dry_run
)
else:
# copy everything at once
if os.path.exists(self.MODEL_OUT_DIR):
if self.overwrite:
try:
print('Error: {} exists, attempting to remove.'.format(
self.MODEL_OUT_DIR))
shutil.rmtree(self.MODEL_OUT_DIR)
except OSError:
# gcp will not overwrite dirs, so forced to save under
# a different name despite overwrite=True
print(("Error: couldn't remove {} (probably mounted read"
"-only); will rename new directory.").format(
self.MODEL_OUT_DIR))
else:
print("Error: {} exists; will rename new directory.".format(
self.MODEL_OUT_DIR))
try:
if os.path.exists(self.MODEL_OUT_DIR):
# check again, since rmtree() might have succeeded
self.MODEL_OUT_DIR, version = \
util_mdtf.bump_version(self.MODEL_OUT_DIR)
new_wkdir, _ = \
util_mdtf.bump_version(self.MODEL_WK_DIR, new_v=version)
print("\tDEBUG: move {} to {}".format(self.MODEL_WK_DIR, new_wkdir))
shutil.move(self.MODEL_WK_DIR, new_wkdir)
self.MODEL_WK_DIR = new_wkdir
gcp_wrapper(
self.MODEL_WK_DIR, self.MODEL_OUT_DIR,
timeout=self.file_transfer_timeout,
dry_run=self.dry_run
)
except Exception:
raise # only delete MODEL_WK_DIR if copied successfully
shutil.rmtree(self.MODEL_WK_DIR)
[docs]class GfdlppDataManager(GfdlarchiveDataManager):
[docs] def __init__(self, case_dict, DateFreqMixin=None):
# assign explicitly else linter complains
self.component = None
self.data_freq = None
self.chunk_freq = None
super(GfdlppDataManager, self).__init__(case_dict, DateFreqMixin)
UndecidedKey = namedtuple('ComponentKey', ['component', 'chunk_freq'])
[docs] def undecided_key(self, dataset):
return self.UndecidedKey(
component=dataset.component,
chunk_freq=str(dataset.chunk_freq)
)
[docs] def parse_relative_path(self, subdir, filename):
rel_path = os.path.join(subdir, filename)
match = re.match(r"""
/? # maybe initial separator
(?P<component>\w+)/ # component name
ts/ # timeseries; TODO: handle time averages (not needed now)
(?P<date_freq>\w+)/ # ts freq
(?P<chunk_freq>\w+)/ # data chunk length
(?P<component2>\w+)\. # component name (again)
(?P<start_date>\d+)-(?P<end_date>\d+)\. # file's date range
(?P<name_in_model>\w+)\. # field name
nc # netCDF file extension
""", rel_path, re.VERBOSE)
if match:
#if match.group('component') != match.group('component2'):
# raise ValueError("Can't parse {}.".format(rel_path))
ds = DataSet(**(match.groupdict()))
del ds.component2
ds._remote_data = os.path.join(self.root_dir, rel_path)
ds.date_range = datelabel.DateRange(ds.start_date, ds.end_date)
ds.date_freq = self.DateFreq(ds.date_freq)
ds.chunk_freq = self.DateFreq(ds.chunk_freq)
return ds
else:
raise ValueError("Can't parse {}, skipping.".format(rel_path))
[docs] def subdirectory_filters(self):
return [self.component, 'ts', frepp_freq(self.data_freq),
frepp_freq(self.chunk_freq)]
[docs] @staticmethod
def _heuristic_component_tiebreaker(str_list):
"""Determine experiment component(s) from heuristics.
1. If we're passed multiple components, select those containing 'cmip'.
2. If that selects multiple components, break the tie by selecting the
component with the fewest words (separated by '_'), or, failing that,
the shortest overall name.
Args:
str_list (:py:obj:`list` of :py:obj:`str`:): list of component names.
Returns: :py:obj:`str`: name of component that breaks the tie.
"""
def _heuristic_tiebreaker_sub(strs):
min_len = min(len(s.split('_')) for s in strs)
strs2 = [s for s in strs if (len(s.split('_')) == min_len)]
if len(strs2) == 1:
return strs2[0]
else:
return min(strs2, key=len)
cmip_list = [s for s in str_list if ('cmip' in s.lower())]
if cmip_list:
return _heuristic_tiebreaker_sub(cmip_list)
else:
return _heuristic_tiebreaker_sub(str_list)
[docs] def _decide_allowed_components(self):
choices = dict.fromkeys(self.data_files)
cmpt_choices = choose.minimum_cover(
self.data_files,
attrgetter('component'),
self._heuristic_component_tiebreaker
)
for data_key, cmpt in iter(cmpt_choices.items()):
# take shortest chunk frequency (revisit?)
chunk_freq = min(u_key.chunk_freq \
for u_key in self.data_files[data_key] \
if u_key.component == cmpt)
choices[data_key] = self.UndecidedKey(component=cmpt, chunk_freq=str(chunk_freq))
return choices
[docs]class Gfdlcmip6abcDataManager(six.with_metaclass(ABCMeta, GfdlarchiveDataManager)):
[docs] def __init__(self, case_dict, DateFreqMixin=None):
# set root_dir
# from experiment and model, determine institution and mip
# set realization code = 'r1i1p1f1' unless specified
cmip = cmip6.CMIP6_CVs()
if 'activity_id' not in case_dict:
if 'experiment_id' in case_dict:
key = case_dict['experiment_id']
elif 'experiment' in case_dict:
key = case_dict['experiment']
else:
raise Exception("Can't determine experiment.")
self.experiment_id = key
self.activity_id = cmip.lookup(key, 'experiment_id', 'activity_id')
if 'institution_id' not in case_dict:
if 'source_id' in case_dict:
key = case_dict['source_id']
elif 'model' in case_dict:
key = case_dict['model']
else:
raise Exception("Can't determine model/source.")
self.source_id = key
self.institution_id = cmip.lookup(key, 'source_id', 'institution_id')
if 'member_id' not in case_dict:
self.member_id = 'r1i1p1f1'
case_dict['CASE_ROOT_DIR'] = os.path.join(
self._cmip6_root, self.activity_id, self.institution_id,
self.source_id, self.experiment_id, self.member_id)
# assign explicitly else linter complains
self.data_freq = None
self.table_id = None
self.grid_label = None
self.version_date = None
super(Gfdlcmip6abcDataManager, self).__init__(
case_dict, DateFreqMixin=cmip6.CMIP6DateFrequency
)
if 'data_freq' in self.__dict__:
self.table_id = cmip.table_id_from_freq(self.data_freq)
[docs] @abstractmethod # note: only using this as a property
def _cmip6_root(self):
pass
# also need to determine table?
UndecidedKey = namedtuple('UndecidedKey',
['table_id', 'grid_label', 'version_date'])
[docs] def undecided_key(self, dataset):
return self.UndecidedKey(
table_id=str(dataset.table_id),
grid_label=dataset.grid_label,
version_date=str(dataset.version_date)
)
[docs] def parse_relative_path(self, subdir, filename):
d = cmip6.parse_DRS_path(
os.path.join(self.root_dir, subdir)[len(self._cmip6_root):],
filename
)
d['name_in_model'] = d['variable_id']
ds = DataSet(**d)
ds._remote_data = os.path.join(self.root_dir, subdir, filename)
return ds
[docs] def subdirectory_filters(self):
return [self.table_id, None, # variable_id
self.grid_label, self.version_date]
[docs] @staticmethod
def _cmip6_table_tiebreaker(str_list):
# no suffix or qualifier, if possible
tbls = [cmip6.parse_mip_table_id(t) for t in str_list]
tbls = [t for t in tbls if (not t['spatial_avg'] and not t['region'] \
and t['temporal_avg'] == 'interval')]
if not tbls:
raise Exception('Need to refine table_id more carefully')
tbls = min(tbls, key=lambda t: len(t['table_prefix']))
return tbls['table_id']
[docs] @staticmethod
def _cmip6_grid_tiebreaker(str_list):
# no suffix or qualifier, if possible
grids = [cmip6.parse_grid_label(g) for g in str_list]
grids = [g for g in grids if (
not g['spatial_avg'] and not g['region']
)]
if not grids:
raise Exception('Need to refine grid_label more carefully')
grids = min(grids, key=itemgetter('grid_number'))
return grids['grid_label']
[docs] def _decide_allowed_components(self):
tables = choose.minimum_cover(
self.data_files,
attrgetter('table_id'),
self._cmip6_table_tiebreaker
)
dkeys_for_each_pod = list(self.data_pods.inverse().values())
grid_lbl = choose.all_same_if_possible(
self.data_files,
dkeys_for_each_pod,
attrgetter('grid_label'),
self._cmip6_grid_tiebreaker
)
version_date = choose.require_all_same(
self.data_files,
attrgetter('version_date'),
lambda dates: str(max(datelabel.Date(dt) for dt in dates))
)
choices = dict.fromkeys(self.data_files)
for data_key in choices:
choices[data_key] = self.UndecidedKey(
table_id=str(tables[data_key]),
grid_label=grid_lbl[data_key],
version_date=version_date[data_key]
)
return choices
[docs]class Gfdludacmip6DataManager(Gfdlcmip6abcDataManager):
_cmip6_root = os.sep + os.path.join('archive','pcmdi','repo','CMIP6')
[docs]class Gfdldatacmip6DataManager(Gfdlcmip6abcDataManager):
# Kris says /data_cmip6 used to stage pre-publication data, so shouldn't
# be used as a data source unless explicitly requested by user
_cmip6_root = os.sep + os.path.join('data_cmip6','CMIP6')
[docs]def gcp_wrapper(source_path, dest_dir, timeout=0, dry_run=False):
modMgr = ModuleManager()
modMgr.load('gcp')
source_path = os.path.normpath(source_path)
dest_dir = os.path.normpath(dest_dir)
# gcp requires trailing slash, ln ignores it
if os.path.isdir(source_path):
source = ['-r', 'gfdl:' + source_path + os.sep]
# gcp /A/B/ /C/D/ will result in /C/D/B, so need to specify parent dir
dest = ['gfdl:' + os.path.dirname(dest_dir) + os.sep]
else:
source = ['gfdl:' + source_path]
dest = ['gfdl:' + dest_dir + os.sep]
print('\tDEBUG: GCP {} -> {}'.format(source[-1], dest[-1]))
util.run_command(
['gcp', '--sync', '-v', '-cd'] + source + dest,
timeout=timeout,
dry_run=dry_run
)
[docs]def make_remote_dir(dest_dir, timeout=None, dry_run=None):
try:
os.makedirs(dest_dir)
except OSError:
# use GCP for this because output dir might be on a read-only filesystem.
# apparently trying to test this with os.access is less robust than
# just catching the error
config = util_mdtf.ConfigManager()
tmpdirs = util_mdtf.TempDirManager()
work_dir = tmpdirs.make_tempdir()
if timeout is None:
timeout = config.config.get('file_transfer_timeout', 0)
if dry_run is None:
dry_run = config.config.get('dry_run', False)
work_dir = os.path.join(work_dir, os.path.basename(dest_dir))
os.makedirs(work_dir)
gcp_wrapper(work_dir, dest_dir, timeout=timeout, dry_run=dry_run)
[docs]def running_on_PPAN():
"""Return true if current host is in the PPAN cluster."""
host = os.uname()[1].split('.')[0]
return (re.match(r"(pp|an)\d{3}", host) is not None)
[docs]def is_on_tape_filesystem(path):
# handle eg. /arch0 et al as well as /archive.
return any(os.path.realpath(path).startswith(s) \
for s in ['/arch', '/ptmp', '/work'])
[docs]def frepp_freq(date_freq):
# logic as written would give errors for 1yr chunks (?)
if date_freq is None:
return date_freq
assert isinstance(date_freq, datelabel.DateFrequency)
if date_freq.unit == 'hr' or date_freq.quantity != 1:
return date_freq.format()
else:
# weekly not used in frepp
_frepp_dict = {
'yr': 'annual',
'season': 'seasonal',
'mo': 'monthly',
'day': 'daily',
'hr': 'hourly'
}
return _frepp_dict[date_freq.unit]
frepp_translate = {
'in_data_dir': 'root_dir', # /pp/ directory
'descriptor': 'CASENAME',
'out_dir': 'OUTPUT_DIR',
'WORKDIR': 'WORKING_DIR',
'yr1': 'FIRSTYR',
'yr2': 'LASTYR'
}
[docs]def parse_frepp_stub(frepp_stub):
"""Converts the frepp arguments to a Python dictionary.
See `<https://wiki.gfdl.noaa.gov/index.php/FRE_User_Documentation#Automated_creation_of_diagnostic_figures>`__.
Returns: :py:obj:`dict` of frepp parameters.
"""
# parse arguments and relabel keys
d = {}
regex = re.compile(r"""
\s*set[ ] # initial whitespace, then 'set' followed by 1 space
(?P<key>\w+) # key is simple token, no problem
\s+=?\s* # separator is any whitespace, with 0 or 1 "=" signs
(?P<value> # want to capture all characters to end of line, so:
[^=#\s] # first character = any non-separator, or '#' for comments
.* # capture everything between first and last chars
[^\s] # last char = non-whitespace.
|[^=#\s]\b) # separate case for when value is a single character.
\s*$ # remainder of line must be whitespace.
""", re.VERBOSE)
for line in frepp_stub.splitlines():
print("line = '{}'".format(line))
match = re.match(regex, line)
if match:
if match.group('key') in frepp_translate:
key = frepp_translate[match.group('key')]
else:
key = match.group('key')
d[key] = match.group('value')
# cast from string
for int_key in ['FIRSTYR', 'LASTYR', 'verbose']:
if int_key in d:
d[int_key] = int(d[int_key])
for bool_key in ['make_variab_tar', 'test_mode']:
if bool_key in d:
d[bool_key] = bool(d[bool_key])
d['frepp'] = (d != {})
return d