#!/usr/bin/env python
# ======================================================================
# NOAA Model Diagnotics Task Force (MDTF) Diagnostic Driver
#
# March 2019
# Dani Coleman, NCAR
# Chih-Chieh (Jack) Chen, NCAR,
# Yi-Hung Kuo, UCLA
#
# The MDTF code package and the participating PODs are distributed under
# the LGPLv3 license (see LICENSE.txt).
# ======================================================================
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
# do version check before importing other stuff
if sys.version_info[0] == 2 and sys.version_info[1] < 7:
print(("ERROR: MDTF currently only supports python >= 2.7. Please check "
"which version is on your $PATH (e.g. with `which python`.)"))
print("Attempted to run with following python version:\n{}".format(sys.version))
exit()
# passed; continue with imports
import os
import signal
import shutil
from src import cli
from src import util
from src import util_mdtf
from src import data_manager
from src import environment_manager
from src import shared_diagnostic
from src import netcdf_helper
[docs]class MDTFFramework(object):
[docs] def __init__(self, code_root, defaults_rel_path):
"""Initial dispatch of CLI args: are we printing help info or running
framework.
"""
self.code_root = code_root
# delete temp files if we're killed
signal.signal(signal.SIGTERM, self.cleanup_tempdirs)
signal.signal(signal.SIGINT, self.cleanup_tempdirs)
# poor man's subparser: argparse's subparser doesn't handle this
# use case easily, so just dispatch on first argument
if len(sys.argv) == 1 or \
len(sys.argv) == 2 and sys.argv[1].lower().endswith('help'):
# build CLI, print its help and exit
cli_obj = cli.FrameworkCLIHandler(code_root, defaults_rel_path)
cli_obj.parser.print_help()
exit()
elif sys.argv[1].lower() == 'info':
# "subparser" for command-line info
cli.InfoCLIHandler(self.code_root, sys.argv[2:])
else:
# not printing help or info, setup CLI normally
# move into its own function so that child classes can customize
# above options without having to rewrite below
self._framework_init(code_root, defaults_rel_path)
[docs] def cleanup_tempdirs(self, signum=None, frame=None):
# delete temp files
util.signal_logger(self.__class__.__name__, signum, frame)
config = util_mdtf.ConfigManager()
tmpdirs = util_mdtf.TempDirManager()
if not config.config.get('keep_temp', False):
tmpdirs.cleanup()
[docs] def _framework_init(self, code_root, defaults_rel_path):
# set up CLI and parse arguments
# print('\tDEBUG: argv = {}'.format(sys.argv[1:]))
cli_obj = cli.FrameworkCLIHandler(code_root, defaults_rel_path)
self._cli_pre_parse_hook(cli_obj)
cli_obj.parse_cli()
self._cli_post_parse_hook(cli_obj)
# load pod data
pod_info_tuple = cli.load_pod_settings(code_root)
# do nontrivial parsing
config = util_mdtf.ConfigManager(cli_obj, pod_info_tuple)
print(util.pretty_print_json(config.paths))
self.parse_mdtf_args(cli_obj, config)
# config should be read-only from here on
self._post_parse_hook(cli_obj, config)
self._print_config(cli_obj, config)
[docs] def _cli_pre_parse_hook(self, cli_obj):
# gives subclasses the ability to customize CLI handler before parsing
# although most of the work done by parse_mdtf_args
pass
[docs] def _cli_post_parse_hook(self, cli_obj):
# gives subclasses the ability to customize CLI handler after parsing
# although most of the work done by parse_mdtf_args
if cli_obj.config.get('dry_run', False):
cli_obj.config['test_mode'] = True
[docs] @staticmethod
def _populate_from_cli(cli_obj, group_nm, target_d=None):
if target_d is None:
target_d = dict()
for key, val in cli_obj.iteritems_cli(group_nm):
if val: # assign nonempty items only
target_d[key] = val
return target_d
[docs] def parse_mdtf_args(self, cli_obj, config):
"""Parse script options returned by the CLI. For greater customizability,
most of the functionality is spun out into sub-methods.
"""
self.parse_env_vars(cli_obj, config)
self.parse_pod_list(cli_obj, config)
self.parse_case_list(cli_obj, config)
self.parse_paths(cli_obj, config)
[docs] def parse_env_vars(self, cli_obj, config):
# don't think PODs use global env vars?
# self.envvars = self._populate_from_cli(cli_obj, 'PATHS', self.envvars)
config.global_envvars['RGB'] = os.path.join(self.code_root,'src','rgb')
# globally enforce non-interactive matplotlib backend
# see https://matplotlib.org/3.2.2/tutorials/introductory/usage.html#what-is-a-backend
config.global_envvars['MPLBACKEND'] = "Agg"
[docs] def parse_pod_list(self, cli_obj, config):
self.pod_list = []
args = util.coerce_to_iter(config.config.pop('pods', []), set)
if 'example' in args or 'examples' in args:
self.pod_list = [pod for pod in config.pods \
if pod.startswith('example')]
elif 'all' in args:
self.pod_list = [pod for pod in config.pods \
if not pod.startswith('example')]
else:
# specify pods by realm
realms = args.intersection(set(config.all_realms))
args = args.difference(set(config.all_realms)) # remainder
for key in config.pod_realms:
if util.coerce_to_iter(key, set).issubset(realms):
self.pod_list.extend(config.pod_realms[key])
# specify pods by name
pods = args.intersection(set(config.pods))
self.pod_list.extend(list(pods))
for arg in args.difference(set(config.pods)): # remainder:
print("WARNING: Didn't recognize POD {}, ignoring".format(arg))
# exclude examples
self.pod_list = [pod for pod in self.pod_list \
if not pod.startswith('example')]
if not self.pod_list:
print(("WARNING: no PODs selected to be run. Do `./mdtf info pods`"
" for a list of available PODs, and check your -p/--pods argument."))
print('Received --pods = {}'.format(list(args)))
exit()
[docs] def parse_case_list(self, cli_obj, config):
case_list_in = util.coerce_to_iter(cli_obj.case_list)
cli_d = self._populate_from_cli(cli_obj, 'MODEL')
if 'CASE_ROOT_DIR' not in cli_d and cli_obj.config.get('root_dir', None):
# CASE_ROOT was set positionally
cli_d['CASE_ROOT_DIR'] = cli_obj.config['root_dir']
if not case_list_in:
case_list_in = [cli_d]
case_list = []
for case_tup in enumerate(case_list_in):
case_list.append(self.parse_case(case_tup, cli_d, cli_obj, config))
self.case_list = [case for case in case_list if case is not None]
if not self.case_list:
print("ERROR: no valid entries in case_list. Please specify model run information.")
print('Received:')
print(util.pretty_print_json(case_list_in))
exit(1)
[docs] def parse_case(self, case_tup, cli_d, cli_obj, config):
n, d = case_tup
if 'CASE_ROOT_DIR' not in d and 'root_dir' in d:
d['CASE_ROOT_DIR'] = d.pop('root_dir')
case_convention = d.get('convention', '')
d.update(cli_d)
if case_convention:
d['convention'] = case_convention
if not ('CASENAME' in d or ('model' in d and 'experiment' in d)):
print(("WARNING: Need to specify either CASENAME or model/experiment "
"in caselist entry {}, skipping.").format(n+1))
return None
_ = d.setdefault('model', d.get('convention', ''))
_ = d.setdefault('experiment', '')
_ = d.setdefault('CASENAME', '{}_{}'.format(d['model'], d['experiment']))
for field in ['FIRSTYR', 'LASTYR', 'convention']:
if not d.get(field, None):
print(("WARNING: No value set for {} in caselist entry {}, "
"skipping.").format(field, n+1))
return None
# if pods set from CLI, overwrite pods in case list
d['pod_list'] = self.set_case_pod_list(d, cli_obj, config)
return d
[docs] def set_case_pod_list(self, case, cli_obj, config):
# if pods set from CLI, overwrite pods in case list
# already finalized self.pod-list by the time we get here
if not cli_obj.is_default['pods'] or not case.get('pod_list', None):
return self.pod_list
else:
return case['pod_list']
[docs] def parse_paths(self, cli_obj, config):
config.paths.parse(cli_obj.config, cli_obj.custom_types.get('path', []))
[docs] def _post_parse_hook(self, cli_obj, config):
# init other services
_ = util_mdtf.TempDirManager()
_ = util_mdtf.VariableTranslator()
self.verify_paths(config)
[docs] def verify_paths(self, config):
# clean out WORKING_DIR if we're not keeping temp files
if os.path.exists(config.paths.WORKING_DIR) and not \
(config.config.get('keep_temp', False) \
or config.paths.WORKING_DIR == config.paths.OUTPUT_DIR):
shutil.rmtree(config.paths.WORKING_DIR)
util_mdtf.check_required_dirs(
already_exist = [
config.paths.CODE_ROOT, config.paths.OBS_DATA_ROOT
],
create_if_nec = [
config.paths.MODEL_DATA_ROOT,
config.paths.WORKING_DIR,
config.paths.OUTPUT_DIR
])
[docs] def _print_config(self, cli_obj, config):
# make config nested dict for backwards compatibility
# this is all temporary
d = dict()
for n, case in enumerate(self.case_list):
key = 'case_list({})'.format(n)
d[key] = case
d['pod_list'] = self.pod_list
d['paths'] = config.paths
d['paths'].pop('_unittest', None)
d['settings'] = dict()
settings_gps = set(cli_obj.parser_groups).difference(
set(['parser','PATHS','MODEL','DIAGNOSTICS'])
)
for group in settings_gps:
d['settings'] = self._populate_from_cli(cli_obj, group, d['settings'])
d['settings'] = {k:v for k,v in iter(d['settings'].items()) \
if k not in d['paths']}
d['envvars'] = config.global_envvars
print('DEBUG: SETTINGS:')
print(util.pretty_print_json(d))
_dispatch_search = [
data_manager, environment_manager, shared_diagnostic, netcdf_helper
]
[docs] def manual_dispatch(self, config):
def _dispatch(setting, class_suffix):
class_prefix = config.config.get(setting, '')
class_prefix = util.coerce_from_iter(class_prefix)
# drop '_' and title-case class name
class_prefix = ''.join(class_prefix.split('_')).title()
for mod in self._dispatch_search:
try:
return getattr(mod, class_prefix+class_suffix)
except:
continue
print("No class named {}.".format(class_prefix+class_suffix))
raise Exception('no_class')
self.DataManager = _dispatch('data_manager', 'DataManager')
self.EnvironmentManager = _dispatch('environment_manager', 'EnvironmentManager')
self.Diagnostic = _dispatch('diagnostic', 'Diagnostic')
self.NetCDFHelper = _dispatch('netcdf_helper', 'NetcdfHelper')
[docs] def main_loop(self):
config = util_mdtf.ConfigManager()
self.manual_dispatch(config)
caselist = []
# only run first case in list until dependence on env vars cleaned up
for case_dict in self.case_list[0:1]:
case = self.DataManager(case_dict)
for pod_name in case.pod_list:
try:
pod = self.Diagnostic(pod_name)
except AssertionError as error:
print(str(error))
case.pods.append(pod)
case.setUp()
case.fetch_data()
caselist.append(case)
for case in caselist:
env_mgr = self.EnvironmentManager(config)
env_mgr.pods = case.pods # best way to do this?
# nc_helper = self.NetCDFHelper()
# case.preprocess_local_data(
# netcdf_mixin=nc_helper, environment_manager=env_mgr
# )
env_mgr.setUp()
env_mgr.run()
env_mgr.tearDown()
for case in caselist:
case.tearDown()
self.cleanup_tempdirs()
# should move this out of "src" package, but need to create wrapper shell script
# to set framework conda env.
if __name__ == '__main__':
# get dir of currently executing script:
cwd = os.path.dirname(os.path.realpath(__file__))
code_root, src_dir = os.path.split(cwd)
defaults_rel_path = os.path.join(src_dir, 'cli.jsonc')
if not os.path.exists(defaults_rel_path):
# print('Warning: site-specific cli.jsonc not found, using template.')
defaults_rel_path = os.path.join(src_dir, 'cli_template.jsonc')
mdtf = MDTFFramework(code_root, defaults_rel_path)
print("\n======= Starting {}".format(__file__))
mdtf.main_loop()
print("Exiting normally from {}".format(__file__))