from __future__ import absolute_import, division, print_function, unicode_literals
import os
from src import six
import re
from src import datelabel
from src import util
from src import util_mdtf
[docs]class CMIP6_CVs(util.Singleton):
[docs] def __init__(self, unittest=False):
if unittest:
# value not used, when we're testing will mock out call to read_json
# below with actual translation table to use for test
file_ = 'dummy_filename'
else:
config = util_mdtf.ConfigManager()
file_ = os.path.join(config.paths.CODE_ROOT, 'src',
'cmip6-cmor-tables','Tables','CMIP6_CV.json')
self._contents = util.read_json(file_)
self._contents = self._contents['CV']
for k in ['product','version_metadata','required_global_attributes',
'further_info_url','Conventions','license']:
del self._contents[k]
# munge table_ids
self._contents['table_id'] = dict.fromkeys(self._contents['table_id'])
for tbl in self._contents['table_id']:
self._contents['table_id'][tbl] = parse_mip_table_id(tbl)
self.cv = dict()
self._lookups = dict()
[docs] def _make_cv(self):
# make on-demand
if self.cv:
return
for k in self._contents:
self.cv[k] = util.coerce_to_iter(self._contents[k])
[docs] def is_in_cv(self, category, items):
self._make_cv()
assert category in self.cv
if util.is_iterable(items):
return [(item in self.cv[category]) for item in items]
else:
return (items in self.cv[category])
[docs] def get_lookup(self, source, dest):
if (source, dest) in self._lookups:
return self._lookups[(source, dest)]
elif (dest, source) in self._lookups:
return self._lookups[(dest, source)].inverse()
elif source in self._contents:
k = list(self._contents[source])[0]
if dest not in self._contents[source][k]:
raise KeyError(
"Can't find {} in attributes for {}.".format(dest, source))
mm = util.MultiMap()
for k in self._contents[source]:
mm[k].update(
util.coerce_to_iter(self._contents[source][k][dest], set)
)
self._lookups[(source, dest)] = mm
return mm
elif dest in self._contents:
return self._lookups[(dest, source)].inverse()
else:
raise KeyError('Neither {} or {} in CV table list.'.format(source, dest))
[docs] def lookup(self, source_items, source, dest):
_lookup = self.get_lookup(source, dest)
if util.is_iterable(source_items):
return [util.coerce_from_iter(_lookup[item]) for item in source_items]
else:
return util.coerce_from_iter(_lookup[source_items])
# ----------------------------------
[docs] def table_id_from_freq(self, date_freq):
self._make_cv()
assert 'table_id' in self.cv
return [tbl for tbl in self.cv['table_id'] \
if (parse_mip_table_id(tbl)['date_freq'] == date_freq)]
[docs]@six.python_2_unicode_compatible
class CMIP6DateFrequency(datelabel.DateFrequency):
"""
`<http://goo.gl/v1drZl>`__, page 16
"""
_precision_lookup = {
'fx': 0, 'yr': 1, 'mo': 2, 'day': 3,
'hr': 5, # includes minutes
'min': 6, # = subhr, minutes and seconds
}
_regex = re.compile(r"""
^
(?P<quantity>(1|3|6)?)
(?P<unit>[a-z]*?)
(?P<avg>(C|CM|Pt)?)
$
""", re.VERBOSE)
__str__ = format
# --------------------------------------
# see https://earthsystemcog.org/projects/wip/mip_table_about
# (which doesn't cover all cases)
mip_table_regex = re.compile(r"""
^ # start of line
(?P<table_prefix>(A|CF|E|I|AER|O|L|LI|SI)?)
# maybe a digit, followed by as few lowercase letters as possible:
(?P<table_freq>\d?[a-z]*?)
(?P<table_suffix>(ClimMon|Lev|Plev|Ant|Gre)?)
(?P<table_qualifier>(Pt|Z|Off)?)
$ # end of line - necessary for lazy capture to work
""", re.VERBOSE)
[docs]def parse_mip_table_id(mip_table):
match = re.match(mip_table_regex, mip_table)
if match:
md = match.groupdict()
md['table_id'] = mip_table
if md['table_freq'] == 'clim':
md['date_freq'] = CMIP6DateFrequency('mon')
else:
md['date_freq'] = CMIP6DateFrequency(md['table_freq'])
if md['table_qualifier'] == 'Z':
md['spatial_avg'] = 'zonal_mean'
else:
md['spatial_avg'] = None
if md['table_qualifier'] == 'Pt':
md['temporal_avg'] = 'point'
else:
md['temporal_avg'] = 'interval'
if md['table_suffix'] == 'a':
md['region'] = 'Antarctica'
elif md['table_suffix'] == 'g':
md['region'] = 'Greenland'
else:
md['region'] = None
return md
else:
raise ValueError("Can't parse table {}.".format(mip_table))
grid_label_regex = re.compile(r"""
g
(?P<global_mean>m?)
(?P<regrid>n|r?)
(?P<num>\d?)
(?P<region>a|g?)
(?P<zonal_mean>z?)
""", re.VERBOSE)
[docs]def parse_grid_label(grid_label):
match = re.match(grid_label_regex, grid_label)
if match:
md = match.groupdict()
ans = dict()
ans['grid_label'] = grid_label
if md['global_mean']:
ans['spatial_avg'] = 'global_mean'
elif md['zonal_mean']:
ans['spatial_avg'] = 'zonal_mean'
else:
ans['spatial_avg'] = None
ans['native_grid'] = not (md['regrid'] == 'r')
if not md['num']:
ans['grid_number'] = 0
else:
ans['grid_number'] = md['num']
if md['region'] == 'a':
ans['region'] = 'Antarctica'
elif md['region'] == 'g':
ans['region'] = 'Greenland'
else:
ans['region'] = None
return ans
else:
raise ValueError("Can't parse grid {}.".format(grid_label))
drs_directory_regex = re.compile(r"""
/? # maybe initial separator
(CMIP6/)?
(?P<activity_id>\w+)/
(?P<institution_id>[a-zA-Z0-9_-]+)/
(?P<source_id>[a-zA-Z0-9_-]+)/
(?P<experiment_id>[a-zA-Z0-9_-]+)/
(?P<member_id>\w+)/
(?P<table_id>\w+)/
(?P<variable_id>\w+)/
(?P<grid_label>\w+)/
v(?P<version_date>\d+)
/? # maybe final separator
""", re.VERBOSE)
# TODO: parse subexperiments!
[docs]def parse_DRS_directory(dir_):
match = re.match(drs_directory_regex, dir_)
if match:
md = match.groupdict()
md['version_date'] = datelabel.Date(md['version_date'])
md.update(parse_mip_table_id(md['table_id']))
return md
else:
raise ValueError("Can't parse dir {}.".format(dir_))
drs_filename_regex = re.compile(r"""
(?P<variable_id>\w+)_ # field name
(?P<table_id>\w+)_ # field name
(?P<source_id>[a-zA-Z0-9_-]+)_ # field name
(?P<experiment_id>[a-zA-Z0-9_-]+)_ # field name
(?P<realization_code>\w+)_ # field name
(?P<grid_label>\w+)_ # field name
(?P<start_date>\d+)-(?P<end_date>\d+) # file's date range
\.nc # netCDF file extension
""", re.VERBOSE)
[docs]def parse_DRS_filename(file_):
match = re.match(drs_filename_regex, file_)
if match:
md = match.groupdict()
md['start_date'] = datelabel.Date(md['start_date'])
md['end_date'] = datelabel.Date(md['end_date'])
md['date_range'] = datelabel.DateRange(md['start_date'], md['end_date'])
md.update(parse_mip_table_id(md['table_id']))
return md
else:
raise ValueError("Can't parse file {}.".format(file_))
[docs]def parse_DRS_path(*args):
if len(args) == 1:
dir_, file_ = os.path.split(args[0])
elif len(args) == 2:
dir_, file_ = args
else:
raise ValueError()
d1 = parse_DRS_directory(dir_)
d2 = parse_DRS_filename(file_)
common_keys = set(d1)
common_keys = common_keys.intersection(list(d2))
for key in common_keys:
if d1[key] != d2[key]:
raise ValueError("{} fields inconsistent in parsing {}".format(
key, args))
d1.update(d2)
return d1