Source code for src.units

"""Functions wrapping unit conversion methods from the third-party
`cfunits <https://ncas-cms.github.io/cfunits/index.html>`__ library.
"""
import cfunits
import re
import logging

_log = logging.getLogger(__name__)


[docs] class Units(cfunits.Units): """Wrap `Units <https://ncas-cms.github.io/cfunits/cfunits.Units.html>`__ class of `cfunits <https://ncas-cms.github.io/cfunits/>`__ to isolate this third-party dependency to the code in this module. """
[docs] def reftime_base_eq(self, other): """Comparison function that recognizes reference time units (e.g., 'days since 1970-01-01') as being equal to unqualified time units of the same base unit ('days'). The cfunits `equivalent() <https://ncas-cms.github.io/cfunits/generated/cfunits.Units.equivalent.html>`__ method returns False on these cases. """ cls = type(self) if self.isreftime and other.isreftime: return self.equals(other) self_2 = (cls(self._units_since_reftime) if self.isreftime else self) other_2 = (cls(other._units_since_reftime) if other.isreftime else other) return self_2.equals(other_2)
[docs] def to_cfunits(*args): """Coerce string-valued units and (quantity, unit) tuples to :class:`Units` objects. If more than one such argument is given in *args*, return a list containing the results of coercing each argument. """ def _coerce(u): if isinstance(u, tuple): # (quantity, unit) tuple assert len(u) == 2 u = u[0] * Units(u[1]) if not isinstance(u, Units): u = Units(u) return u if len(args) == 1: return _coerce(args[0]) else: return [_coerce(arg) for arg in args]
[docs] def to_equivalent_units(*args): """Same as :func:`to_cfunits`, but raises TypeError if units of all quantities in *args* are not equivalent after coercion. """ args = to_cfunits(*args) ref_unit = args.pop() # last entry in list for unit in args: if not ref_unit.equivalent(unit): raise TypeError((f"Units {repr(ref_unit)} and " f"{repr(unit)} are inequivalent.")) args.append(ref_unit) return args
[docs] def relative_tol(x, y): """HACK to return ``max(|x-y|/x, |x-y|/y)`` for unit-ful quantities *x*, *y* with equivalent units. Vulnerable to underflow in principle. *x* and *y* are coerced to :class:`Units` objects via :func:`to_cfunits`. """ x, y = to_equivalent_units(x, y) tol_1 = Units.conform(1.0, x, y) # = float(x/y) tol_2 = Units.conform(1.0, y, x) # = float(y/x) return max(abs(tol_1 - 1.0), abs(tol_2 - 1.0))
[docs] def units_equivalent(*args): """Returns True if and only if all unit-ful quantities in *args* are physically equivalent: they represent the same physical quantity, up to a multiplicative conversion factor. *args* are coerced to :class:`Units` objects via :func:`to_cfunits`. """ args = to_cfunits(*args) ref_unit = args.pop() return all(ref_unit.equivalent(unit) for unit in args)
[docs] def units_reftime_base_eq(*args): """Returns True if and only if all unit-ful quantities in *args* are physically equivalent: they represent the same physical quantity, up to a multiplicative conversion factor. *args* are coerced to :class:`Units` objects via :func:`to_cfunits`. """ args = to_cfunits(*args) ref_unit = args.pop() return all(ref_unit.reftime_base_eq(unit) for unit in args)
[docs] def units_equal(*args, rtol=None): """Returns True if and only if all unit-ful quantities in *args* are strictly equal (:func:`units_equivalent` is True and :func:`conversion_factor` = 1). . note:: rtol, atol tolerances on floating-point equality are not currently implemented in cfunits, so we use :func:`relative_tol`. *args* are coerced to :class:`Units` objects via :func:`to_cfunits`. """ args = to_cfunits(*args) ref_unit = args.pop() if rtol is None: # no tolerances: comparing units w/o quantities (or integer quantities) return all(ref_unit.equals(unit) for unit in args) else: for unit in args: try: if not (relative_tol(ref_unit, unit) <= rtol): return False # outside tolerance except TypeError: return False # inequivalent units return True
[docs] def conversion_factor(source_unit, dest_unit): """Return floating point factor which implements a given unit conversion. Defined so that (conversion factor) * (quantity in *source_units*) = (quantity in *dest_units*). *source_unit*, *dest_unit* are coerced to :class:`Units` objects via :func:`to_cfunits`. """ if str(source_unit) == str(dest_unit): return 1.0 # bypass function if the units have the same string allowing units like '0-1' to be used source_unit, dest_unit = to_equivalent_units(source_unit, dest_unit) return Units.conform(1.0, source_unit, dest_unit)
# --------------------------------------------------------------------
[docs] def convert_scalar_coord(coord, dest_units: str, log=_log): """Given scalar coordinate *coord*, return the appropriate scalar value in new units *dest_units*. """ assert hasattr(coord, 'is_scalar') and coord.is_scalar, 'coord is missing is_scalar attribute or is_scalar = False' if not units_equal(coord.units, dest_units): # convert units of scalar value to convention's coordinate's units dest_value = coord.value * conversion_factor(coord.units, dest_units) log.debug("Converted %s %s %s slice of '%s' to %s %s.", coord.value, coord.units, coord.axis, coord.name, dest_value, dest_units ) else: # identical units log.debug("Copied value of %s slice (=%s %s) of '%s' (identical units).", coord.axis, coord.value, coord.units, coord.name ) dest_value = coord.value return dest_value
[docs] def convert_dataarray(ds, da_name: str, src_unit=None, dest_unit=None, log=_log): """Wrapper for cfunits `conform() <https://ncas-cms.github.io/cfunits/generated/cfunits.Units.conform.html#cfunits.Units.conform>`__ that does unit conversion in-place on a member of an xarray Dataset, updating its units attribute. Args: ds (Dataset): xarray Dataset containing the DataArray to convert. da_name (str): variable name or standard_name of the associated DataArray to do the unit conversion on. src_unit: Current units of *da_name*. Coerced to a :class:`Units` object via :func:`to_cfunits`. Optional; if not given this is populated from the ``units`` attribute of *da_name*, if it exists. dest_unit: Desired units for *da_name*. Coerced to a :class:`Units` object via :func:`to_cfunits`. Raises: ValueError: if *da_name* not in *ds*. TypeError: if *src_unit* or *dest_unit* not correctly defined, or if *da_name* lacks units metadata. Returns: Dataset *ds*, with *da_name* modified in-place. """ da = ds.get(da_name, None) search_attrs = ['standard_name', 'long_name'] if da is None: # search attributes for standard_name or long_name that matches input name ds_vars = [ds.variables, ds.coords.variables] merged = frozenset().union(*ds_vars) # try to find exact matches to input name in standard_name and long_name attributes for var in merged: dset = ds.get(var) for attr in search_attrs: att = dset.attrs.get(attr, None) if isinstance(att, str): if att == da_name: da = dset da_name = var break # try to find approximate matches to input name in standard_name and long_name attributes if da is None: da_name_words = re.split('[-, _:;]', da_name) for var in merged: dset = ds.get(var) for attr in search_attrs: att_value = dset.attrs.get(attr, None) if isinstance(att_value, str): att_words = re.split('[-, _:;]', att_value) name_matches = list(set(da_name_words) & set(att_words)) if len(name_matches) >= min(len(da_name_words), len(att_words)): log.info("Found approximate match for %s in dataset %s attribute %s", da_name, attr, att_value) da = dset da_name = var break if da is None: raise ValueError(f"convert_dataarray: '{da_name}' not found in dataset.") if src_unit is None: try: src_unit = da.attrs['units'] except KeyError: raise TypeError((f"convert_dataarray: 'units' attribute not defined " f"on {da_name}.")) if dest_unit is None: raise TypeError((f"convert_dataarray: dest_unit not given for unit " "conversion on {da_name}.")) std_name = "" # update standard_name entry to long_name if it is undefined if 'standard_name' in da.attrs: if isinstance(da.attrs['standard_name'], str): std_name = f"{da.attrs['standard_name']}" elif 'long_name' in da.attrs: std_name = f"{da.attrs['long_name']}" ds[da_name].attrs['standard_name'] = std_name.replace(' ', '_') # udunits does not recognize mb == hPa, so hardcode correction if src_unit == 'mb': ds[da_name].attrs['units'] = 'hPa' src_unit = 'hPa' if dest_unit == 'mb': dest_unit = 'hPa' if units_equal(src_unit, dest_unit): log.debug(("Source, dest units of '%s'%s identical (%s); no conversion " "done."), da.name, std_name, dest_unit) return ds log.debug("Convert units of '%s'%s from '%s' to '%s'.", da.name, std_name, src_unit, dest_unit ) da_attrs = da.attrs.copy() fac = conversion_factor(src_unit, dest_unit) ds = ds.assign({da_name: fac * ds[da_name]}) ds[da_name].attrs = da_attrs ds[da_name].attrs['units'] = str(dest_unit) return ds