from __future__ import absolute_import, division, print_function, unicode_literals
import os
import io
from src import six
import glob
import shutil
import atexit
import signal
from abc import ABCMeta, abstractmethod
if os.name == 'posix' and six.PY2:
try:
import subprocess32 as subprocess
except ImportError:
import subprocess
else:
import subprocess
from src import util
from src import util_mdtf
from src.shared_diagnostic import PodRequirementFailure
[docs]class EnvironmentManager(six.with_metaclass(ABCMeta)):
# analogue of TestSuite in xUnit - abstract base class
[docs] def __init__(self, verbose=0):
config = util_mdtf.ConfigManager()
self.test_mode = config.config.test_mode
self.pods = []
self.envs = set()
# kill any subprocesses that are still active if we exit normally
# (shouldn't be necessary) or are killed
atexit.register(self.subprocess_cleanup)
signal.signal(signal.SIGTERM, self.subprocess_cleanup)
signal.signal(signal.SIGINT, self.subprocess_cleanup)
# -------------------------------------
# following are specific details that must be implemented in child class
[docs] @abstractmethod
def create_environment(self, env_name):
pass
[docs] @abstractmethod
def set_pod_env(self, pod):
pass
[docs] @abstractmethod
def activate_env_commands(self, env_name):
pass
[docs] @abstractmethod
def deactivate_env_commands(self, env_name):
pass
[docs] @abstractmethod
def destroy_environment(self, env_name):
pass
# -------------------------------------
[docs] def setUp(self):
for pod in self.pods:
self.set_pod_env(pod)
self.envs.add(pod.env)
for env in self.envs:
self.create_environment(env)
# -------------------------------------
[docs] def run(self, verbose=0):
for pod in self.pods:
pod._setup_pod_directories() # should refactor setUp
pod.logfile_obj = io.open(
os.path.join(pod.POD_WK_DIR, pod.name+".log"),
'w', encoding='utf-8'
)
log_str = "--- MDTF.py Starting POD {}\n".format(pod.name)
pod.logfile_obj.write(log_str)
if verbose > 0: print(log_str)
try:
pod.setUp()
except PodRequirementFailure as exc:
log_str = "\nSkipping execution of {}.\nReason: {}\n".format(
exc.pod.name, str(exc))
pod.logfile_obj.write(log_str)
pod.logfile_obj.close()
pod.logfile_obj = None
print(log_str)
pod.skipped = exc
continue
print("{} will run in env: {}".format(pod.name, pod.env))
pod.logfile_obj.write("\n".join(
["Found files: "] + pod.found_files + [" "]))
env_list = ["{}: {}". format(k,v) for k,v in iter(pod.pod_env_vars.items())]
pod.logfile_obj.write("\n".join(
["Env vars: "] + sorted(env_list) + [" "]))
try:
pod.logfile_obj.write("--- MDTF.py calling POD {}\n\n".format(pod.name))
pod.logfile_obj.flush()
pod.process_obj = self.spawn_subprocess(
pod.validate_commands() + pod.run_commands(),
pod.env,
env = os.environ, cwd = pod.POD_WK_DIR,
stdout = pod.logfile_obj, stderr = subprocess.STDOUT
)
except OSError as exc:
print('ERROR :', exc.errno, exc.strerror)
print(" occured with call: {}".format(pod.run_commands()))
pod.skipped = exc
pod.logfile_obj.close()
pod.logfile_obj = None
continue
# if this were python3 we'd have asyncio, instead wait for each process
# to terminate and close all log files
for pod in self.pods:
if pod.process_obj is not None:
pod.process_obj.wait()
pod.process_obj = None
if pod.logfile_obj is not None:
pod.logfile_obj.close()
pod.logfile_obj = None
[docs] def spawn_subprocess(self, cmd_list, env_name,
env=None, cwd=None, stdout=None, stderr=None):
if stdout is None:
stdout = subprocess.STDOUT
if stderr is None:
stderr = subprocess.STDOUT
run_cmds = util.coerce_to_iter(cmd_list, list)
if self.test_mode:
run_cmds = ['echo "TEST MODE: call {}"'.format('; '.join(run_cmds))]
commands = self.activate_env_commands(env_name) \
+ run_cmds \
+ self.deactivate_env_commands(env_name)
# '&&' so we abort if any command in the sequence fails.
if self.test_mode:
for cmd in commands:
print('TEST MODE: call {}'.format(cmd))
else:
print("Calling : {}".format(run_cmds[-1]))
commands = ' && '.join([s for s in commands if s])
# Need to run bash explicitly because 'conda activate' sources
# env vars (can't do that in posix sh). tcsh could also work.
return subprocess.Popen(
['bash', '-c', commands],
env=env, cwd=cwd, stdout=stdout, stderr=stderr
)
# -------------------------------------
[docs] def tearDown(self):
# call diag's tearDown to clean up
for pod in self.pods:
pod.tearDown()
for env in self.envs:
self.destroy_environment(env)
[docs] def subprocess_cleanup(self, signum=None, frame=None):
util.signal_logger(self.__class__.__name__, signum, frame)
# kill any active subprocesses
for pod in self.pods:
if pod.process_obj is not None:
pod.process_obj.kill()
[docs]class NoneEnvironmentManager(EnvironmentManager):
# Do not attempt to switch execution environments for each POD.
[docs] def create_environment(self, env_name):
pass
[docs] def destroy_environment(self, env_name):
pass
[docs] def set_pod_env(self, pod):
pass
[docs] def activate_env_commands(self, env_name):
return []
[docs] def deactivate_env_commands(self, env_name):
return []
[docs]class VirtualenvEnvironmentManager(EnvironmentManager):
# create Python virtualenv to manage environments.
# for R, use xxx.
# Do not attempt management for NCL.
[docs] def __init__(self, verbose=0):
super(VirtualenvEnvironmentManager, self).__init__(verbose)
config = util_mdtf.ConfigManager()
self.venv_root = config.paths.get('venv_root', '')
self.r_lib_root = config.paths.get('r_lib_root', '')
[docs] def create_environment(self, env_name):
if env_name.startswith('py_'):
self._create_py_venv(env_name)
elif env_name.startswith('r_'):
self._create_r_venv(env_name)
else:
pass
[docs] def _create_py_venv(self, env_name):
py_pkgs = set()
for pod in self.pods:
if pod.env == env_name:
py_pkgs.update(set(pod.required_python_modules))
env_path = os.path.join(self.venv_root, env_name)
if not os.path.isdir(env_path):
os.makedirs(env_path) # recursive mkdir if needed
for cmd in [
'python -m virtualenv {}'.format(env_path),
'source {}/bin/activate'.format(env_path),
'pip install {}'.format(' '.join(py_pkgs)),
'deactivate'
]:
util.run_shell_command(cmd)
[docs] def _create_r_venv(self, env_name):
r_pkgs = set()
for pod in self.pods:
if pod.env == env_name:
r_pkgs.update(set(pod.required_r_packages))
r_pkg_str = ', '.join(['"'+x+'"' for x in r_pkgs])
if self.r_lib_root != '':
env_path = os.path.join(self.r_lib_root, env_name)
if not os.path.isdir(env_path):
os.makedirs(env_path) # recursive mkdir if needed
cmds = [
'export R_LIBS_USER="{}"'.format(env_path),
'Rscript -e \'install.packages(c({}), '.format(r_pkg_str) \
+ 'lib=Sys.getenv("R_LIBS_USER"))\''
]
else:
cmds = [
'Rscript -e \'install.packages(c({}))\''.format(r_pkg_str)
]
for cmd in cmds:
util.run_shell_command(cmd)
[docs] def destroy_environment(self, env_name):
pass
[docs] def set_pod_env(self, pod):
langs = [s.lower() for s in pod.runtime_requirements]
if ('r' in langs) or ('rscript' in langs):
pod.env = 'r_' + pod.name
elif 'ncl' in langs:
pod.env = 'ncl'
else:
pod.env = 'py_' + pod.name
[docs] def activate_env_commands(self, env_name):
if env_name.startswith('py_'):
env_path = os.path.join(self.venv_root, env_name)
return ['source {}/bin/activate'.format(env_path)]
elif env_name.startswith('r_'):
env_path = os.path.join(self.r_lib_root, env_name)
return ['export R_LIBS_USER="{}"'.format(env_path)]
else:
return []
[docs] def deactivate_env_commands(self, env_name):
if env_name.startswith('py_'):
return ['deactivate']
elif env_name.startswith('r_'):
return ['unset R_LIBS_USER']
else:
return []
[docs]class CondaEnvironmentManager(EnvironmentManager):
# Use Anaconda to switch execution environments.
env_name_prefix = '_MDTF_' # our envs start with this string to avoid conflicts
[docs] def __init__(self, verbose=0):
super(CondaEnvironmentManager, self).__init__(verbose)
config = util_mdtf.ConfigManager()
self.code_root = config.paths.CODE_ROOT
self.conda_dir = os.path.join(self.code_root, 'src','conda')
self.env_list = []
for file_ in os.listdir(self.conda_dir):
if file_.endswith('.yml'):
name, _ = os.path.splitext(file_)
self.env_list.append(name.split('env_')[-1])
# find conda executable
# conda_init for bash defines conda as a shell function; will get error
# if we try to call the conda executable directly
try:
conda_info = util.run_shell_command(
'{}/conda_init.sh {}'.format(
self.conda_dir, config.paths.get('conda_root','')
))
for line in conda_info:
key, val = line.split('=')
if key == '_CONDA_EXE':
self.conda_exe = val
assert os.path.exists(self.conda_exe)
elif key == '_CONDA_ROOT':
self.conda_root = val
except:
print("Error: can't find conda.")
raise
# find where environments are installed
if 'conda_env_root' in config.paths and config.paths.conda_env_root:
self.conda_env_root = config.paths.conda_env_root
if not os.path.isdir(self.conda_env_root):
os.makedirs(self.conda_env_root) # recursive mkdir if needed
else:
# only true in default anaconda install, may need to fix
self.conda_env_root = os.path.join(self.conda_root, 'envs')
[docs] def create_environment(self, env_name):
# check to see if conda env exists, and if not, try to create it
conda_prefix = os.path.join(self.conda_env_root, env_name)
try:
_ = util.run_shell_command(
'{} env list | grep -qF "{}"'.format(self.conda_exe, conda_prefix)
)
except:
print('Conda env {} not found (grepped for {})'.format(env_name,conda_prefix))
#self._call_conda_create(env_name)
[docs] def _call_conda_create(self, env_name):
if env_name.startswith(self.env_name_prefix):
short_name = env_name[(len(self.env_name_prefix)+1):]
else:
short_name = env_name
path = '{}/env_{}.yml'.format(self.conda_dir, short_name)
if not os.path.exists(path):
print("Can't find {}".format(path))
else:
conda_prefix = os.path.join(self.conda_env_root, env_name)
print('Creating conda env {} in {}'.format(env_name, conda_prefix))
command = \
'source {}/conda_init.sh {} && '.format(
self.conda_dir, self.conda_root
) + '{} env create --force -q -p "{}" -f "{}"'.format(
self.conda_exe, conda_prefix, path
)
try:
_ = util.run_shell_command(command)
except:
raise
[docs] def create_all_environments(self):
try:
_ = util.run_shell_command(
'{}/conda_env_setup.sh -c "{}" -d "{}" --all'.format(
self.conda_dir, self.conda_exe, self.conda_env_root
))
except:
raise
[docs] def destroy_environment(self, env_name):
pass
[docs] def set_pod_env(self, pod):
if pod.name in self.env_list:
# env created specifically for this POD
pod.env = self.env_name_prefix + pod.name
else:
langs = [s.lower() for s in pod.runtime_requirements]
if ('r' in langs) or ('rscript' in langs):
pod.env = self.env_name_prefix + 'R_base'
elif 'ncl' in langs:
pod.env = self.env_name_prefix + 'NCL_base'
elif 'python2' in langs:
raise NotImplementedError('Python 2 not supported for new PODs.')
# pod.env = self.env_name_prefix + 'python2_base'
elif 'python3' in langs:
pod.env = self.env_name_prefix + 'python3_base'
else:
print("Can't find environment providing {}".format(
pod.runtime_requirements))
[docs] def activate_env_commands(self, env_name):
"""Source conda_init.sh to set things that aren't set b/c we aren't
in an interactive shell.
"""
# conda_init for bash defines conda as a shell function; will get error
# if we try to call the conda executable directly
conda_prefix = os.path.join(self.conda_env_root, env_name)
return [
'source {}/conda_init.sh {}'.format(
self.conda_dir, self.conda_root
),
'conda activate {}'.format(conda_prefix)
]
[docs] def deactivate_env_commands(self, env_name):
return []