src.data_manager module

Base classes implementing logic for querying, fetching and preprocessing model data requested by the PODs; see Data layer: Overview.

class src.data_manager.AbstractQueryMixin[source]

Bases: abc.ABC

abstract query_dataset(var)[source]

Sets data attribute on var or raises an exception.

setup_query()[source]

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

pre_query_hook(vars)[source]

Called before querying the presence of a new batch of variables.

set_experiment()[source]

Called after querying the presence of a new batch of variables, to filter or otherwise ensure that the returned DataKeys for all variables comes from the same experimental run of the model, by setting the status attribute of those DataKeys to ACTIVE.

post_query_hook(vars)[source]

Called after select_experiment(), after each query of a new batch of variables.

tear_down_query()[source]

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

__init__()

Initialize self. See help(type(self)) for accurate signature.

class src.data_manager.AbstractFetchMixin[source]

Bases: abc.ABC

abstract fetch_dataset(var, data_key)[source]

Fetches data corresponding to data_key. Populates its local_data attribute with a list of identifiers for successfully fetched data (paths to locally downloaded copies of data).

setup_fetch()[source]

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

pre_fetch_hook(vars)[source]

Called before fetching each batch of query results.

post_fetch_hook(vars)[source]

Called after fetching each batch of query results.

tear_down_fetch()[source]

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

__init__()

Initialize self. See help(type(self)) for accurate signature.

class src.data_manager.AbstractDataSource(*args, **kwargs)[source]

Bases: src.data_manager.AbstractQueryMixin, src.data_manager.AbstractFetchMixin

pre_query_and_fetch_hook()[source]

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

post_query_and_fetch_hook()[source]

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

abstract fetch_dataset(var, data_key)

Fetches data corresponding to data_key. Populates its local_data attribute with a list of identifiers for successfully fetched data (paths to locally downloaded copies of data).

post_fetch_hook(vars)

Called after fetching each batch of query results.

post_query_hook(vars)

Called after select_experiment(), after each query of a new batch of variables.

pre_fetch_hook(vars)

Called before fetching each batch of query results.

pre_query_hook(vars)

Called before querying the presence of a new batch of variables.

abstract query_dataset(var)

Sets data attribute on var or raises an exception.

set_experiment()

Called after querying the presence of a new batch of variables, to filter or otherwise ensure that the returned DataKeys for all variables comes from the same experimental run of the model, by setting the status attribute of those DataKeys to ACTIVE.

setup_fetch()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

setup_query()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

tear_down_fetch()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

tear_down_query()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

class src.data_manager.DataKeyBase(*args, **kwargs)[source]

Bases: src.core.MDTFObjectBase

name: str = sentinel.Mandatory
value: Any = sentinel.Mandatory
expt_key: Any = None
local_data: list
__post_init__()[source]
abstract remote_data()[source]

Returns paths, urls, etc. to be used as input to a fetch_data() method to specify how this dataset is fetched.

__init__(_id: src.util.basic.MDTF_ID = None, _parent: Any = sentinel.Mandatory, status: src.core.ObjectStatus = <ObjectStatus.NOTSET>, value: Any = sentinel.Mandatory, expt_key: Any = None, local_data: list = <factory>)None

Initialize self. See help(type(self)) for accurate signature.

property active
child_deactivation_handler(child, exc)
child_status_update(exc=None)
deactivate(exc, level=None)
property failed
property full_name
iter_children(child_type=None, status=None, status_neq=None)

Generator iterating over child objects associated with this object.

Parameters
  • status – None or ObjectStatus, default None. If None, iterates over all child objects, regardless of status. If a ObjectStatus value is passed, only iterates over child objects with that status.

  • status_neq – None or ObjectStatus, default None. If set, iterates over child objects which don’t have the given status. If status is set, this setting is ignored.

status = 1
class src.data_manager.DataSourceAttributesBase(CASENAME: str = sentinel.Mandatory, FIRSTYR: str = sentinel.Mandatory, LASTYR: str = sentinel.Mandatory, CASE_ROOT_DIR: str = '', convention: str = '', log: dataclasses.InitVar = <Logger src.data_manager (WARNING)>)[source]

Bases: object

Class defining attributes that any DataSource needs to specify:

  • CASENAME: User-supplied label to identify output of this run of the package.

  • FIRSTYR, LASTYR, date_range: Analysis period, specified as a closed interval (i.e. running from 1 Jan of FIRSTYR through 31 Dec of LASTYR).

  • CASE_ROOT_DIR: Root directory containing input model data. Different DataSources may interpret this differently.

  • convention: name of the variable naming convention used by the source of model data.

CASENAME: str = sentinel.Mandatory
FIRSTYR: str = sentinel.Mandatory
LASTYR: str = sentinel.Mandatory
date_range: util.DateRange
CASE_ROOT_DIR: str = ''
convention: str = ''
log: dataclasses.InitVar = <Logger src.data_manager (WARNING)>
__post_init__(log=<Logger>)[source]
__init__(CASENAME: str = sentinel.Mandatory, FIRSTYR: str = sentinel.Mandatory, LASTYR: str = sentinel.Mandatory, CASE_ROOT_DIR: str = '', convention: str = '', log: dataclasses.InitVar = <Logger src.data_manager (WARNING)>)None

Initialize self. See help(type(self)) for accurate signature.

class src.data_manager.PodVarTuple(pod, var)

Bases: tuple

__init__()

Initialize self. See help(type(self)) for accurate signature.

property pod
property var
class src.data_manager.DataSourceBase(*args, **kwargs)[source]

Bases: src.core.MDTFObjectBase, src.util.logs.CaseLoggerMixin, src.data_manager.AbstractDataSource

Base class for handling the data needs of PODs. Executes query for requested model data against the remote data source, fetches the required data locally, preprocesses it, and performs cleanup/formatting of the POD’s output.

property full_name
iter_vars(active=None, pod_active=None)[source]

Iterator over all VarlistEntrys (grandchildren) associated with this case. Returns PodVarTuples (namedtuples) of the Diagnostic and VarlistEntry objects corresponding to the POD and its variable, respectively.

Parameters
  • active

    bool or None, default None. Selects subset of VarlistEntrys which are returned in the namedtuples:

    • active = True: only iterate over currently active VarlistEntries.

    • active = False: only iterate over inactive VarlistEntries

      (VarlistEntries which have either failed or are currently unused alternate variables).

    • active = None: iterate over both active and inactive

      VarlistEntries.

  • pod_active – bool or None, default None. Same as active, but filtering the PODs that are selected.

iter_vars_only(active=None)[source]

Convenience wrapper for iter_vars() that returns only the VarlistEntry objects (grandchildren) from all PODs in this DataSource.

setup()[source]
setup_pod(pod)[source]

Update POD with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into Diagnostic’s init, at the cost of dependency inversion.

setup_var(pod, v)[source]

Update VarlistEntry fields with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into VarlistEntry’s init, at the cost of dependency inversion.

variable_dest_path(pod, var)[source]

Returns the absolute path of the POD’s preprocessed, local copy of the file containing the requested dataset. Files not following this convention won’t be found by the POD.

data_key(value, expt_key=None, status=None)[source]

Constructor for an instance of DataKeyBase that’s used by this DataSource.

is_fetch_necessary(d_key, var=None)[source]
child_deactivation_handler(child, child_exc)[source]

When a DataKey (child) has been deactivated during query or fetch, log a message on all VarlistEntries using it, and deactivate any VarlistEntries with no remaining viable DataKeys.

query_data()[source]
select_data()[source]
fetch_data()[source]
preprocess_data()[source]

Hook to run the preprocessing function on all variables.

request_data()[source]

Top-level method to iteratively query, fetch and preprocess all data requested by PODs, switching to alternate requested data as needed.

query_and_fetch_cleanup(signum=None, frame=None)[source]

Called if framework is terminated abnormally. Not called during normal exit.

__post_init__()
property active
child_status_update(exc=None)
close_log_file(log=True)
deactivate(exc, level=None)
property failed
abstract fetch_dataset(var, data_key)

Fetches data corresponding to data_key. Populates its local_data attribute with a list of identifiers for successfully fetched data (paths to locally downloaded copies of data).

init_extra_log_handlers()
init_log(log_dir, fmt=None)
iter_children(child_type=None, status=None, status_neq=None)

Generator iterating over child objects associated with this object.

Parameters
  • status – None or ObjectStatus, default None. If None, iterates over all child objects, regardless of status. If a ObjectStatus value is passed, only iterates over child objects with that status.

  • status_neq – None or ObjectStatus, default None. If set, iterates over child objects which don’t have the given status. If status is set, this setting is ignored.

name: str = sentinel.Mandatory
post_fetch_hook(vars)

Called after fetching each batch of query results.

post_query_and_fetch_hook()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

post_query_hook(vars)

Called after select_experiment(), after each query of a new batch of variables.

pre_fetch_hook(vars)

Called before fetching each batch of query results.

pre_query_and_fetch_hook()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

pre_query_hook(vars)

Called before querying the presence of a new batch of variables.

abstract query_dataset(var)

Sets data attribute on var or raises an exception.

set_experiment()

Called after querying the presence of a new batch of variables, to filter or otherwise ensure that the returned DataKeys for all variables comes from the same experimental run of the model, by setting the status attribute of those DataKeys to ACTIVE.

setup_fetch()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

setup_query()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

status: src.core.ObjectStatus = 1
tear_down_fetch()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

tear_down_query()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

class src.data_manager.DataFrameDataKey(*args, **kwargs)[source]

Bases: src.data_manager.DataKeyBase

DataKeyBase for use with DataframeQueryDataSourceBase and child classes. The values stored in the DataKey are row indices on the catalog DataFrame in the DataSource, and the remote_data method returns values from those rows from the column in the catalog containing the paths to remote data.

Note

Due to implementation, the catalog used by the DataSource must be static. This code could readily be adapted to a dynamic catalog if its schema provided a unique ID number for each row, to take the place of the row index used here.

__post_init__()[source]

value as passed to DataframeQueryDataSourceBase will be the entire DataFrame corresponding to this group of catalog entries. Here we convert that to a tuple of row indices and store that instead.

remote_data()[source]

Returns paths, urls, etc. to be used as input to a fetch_data method to specify how this dataset is fetched.

__init__(_id: src.util.basic.MDTF_ID = None, _parent: Any = sentinel.Mandatory, status: src.core.ObjectStatus = <ObjectStatus.NOTSET>, value: Any = sentinel.Mandatory, expt_key: Any = None, local_data: list = <factory>)None

Initialize self. See help(type(self)) for accurate signature.

property active
child_deactivation_handler(child, exc)
child_status_update(exc=None)
deactivate(exc, level=None)
expt_key: Any = None
property failed
property full_name
iter_children(child_type=None, status=None, status_neq=None)

Generator iterating over child objects associated with this object.

Parameters
  • status – None or ObjectStatus, default None. If None, iterates over all child objects, regardless of status. If a ObjectStatus value is passed, only iterates over child objects with that status.

  • status_neq – None or ObjectStatus, default None. If set, iterates over child objects which don’t have the given status. If status is set, this setting is ignored.

name: str = sentinel.Mandatory
status = 1
value: Any = sentinel.Mandatory
local_data
class src.data_manager.DataFrameQueryColumnGroup(key_cols=None, derived_cols=None)[source]

Bases: object

Class wrapping a set of catalog (DataFrame) column names used by DataframeQueryDataSourceBase in selecting experiment attributes of a given scope (case-wide, pod-wide or var-wide).

One component of DataframeQueryColumnSpec.

__init__(key_cols=None, derived_cols=None)[source]

Initialize self. See help(type(self)) for accurate signature.

expt_key(df, idx=None)[source]

Returns string-valued key for use in grouping the rows of df by experiment.

Note

We can’t just do a .groupby on column names, because pandas attempts to coerce DateFrequency to a timedelta64, which overflows for static DateFrequency. There doesn’t seem to be a way to disable this type coercion.

expt_key_func(df)[source]

Function that constructs the appropriate experiment_key column when apply()’ed to the query results DataFrame.

class src.data_manager.DataframeQueryColumnSpec(*args, **kwargs)[source]

Bases: object

  • expt_cols: Catalog columns whose values must be the same for all variables being fetched. This is the most common sense in which we “specify an experiment.”

  • pod_expt_cols: Catalog columns whose values must be the same for each POD, but may differ between PODs. An example could be spatial grid resolution. Defaults to the empty set.

  • var_expt_cols: Catalog columns whose values must “be the same for each variable”, i.e. are irrelevant differences for our purposes but must be constrained to a unique value in order to uniquely specify an experiment. An example is the CMIP6 MIP table: the same variable can appear in multiple MIP tables, but the choice of table isn’t relvant for PODs. Defaults to the empty set.

In addition, there are specially designated column names:

  • remote_data_col: Name of the column in the catalog containing the location of the data for that row (e.g., path to a netCDF file).

  • daterange_col: Name of the column in the catalog containing util.DateRange objects specifying the date range covered by the data for that row. If set to None, we assume this information isn’t available from the catalog and date range selection logic is skipped.

expt_cols: src.data_manager.DataFrameQueryColumnGroup = sentinel.Mandatory
pod_expt_cols: DataFrameQueryColumnGroup
var_expt_cols: DataFrameQueryColumnGroup
remote_data_col: str = None
daterange_col: str = None
property has_date_info
property all_expt_cols

Columns of the DataFrame specifying the experiment. We assume that specifying a valid value for each of the columns in this set uniquely identifies an experiment.

expt_key(df, idx=None)[source]

Returns tuple of string-valued keys for grouping files by experiment: (<values of expt_key_cols>, <values of pod_expt_key_cols>, <values of var_expt_key_cols>).

__init__(expt_cols: src.data_manager.DataFrameQueryColumnGroup = sentinel.Mandatory, pod_expt_cols: src.data_manager.DataFrameQueryColumnGroup = <factory>, var_expt_cols: src.data_manager.DataFrameQueryColumnGroup = <factory>, remote_data_col: str = None, daterange_col: str = None)None

Initialize self. See help(type(self)) for accurate signature.

__post_init__(*args, **kwargs)
class src.data_manager.DataframeQueryDataSourceBase(*args, **kwargs)[source]

Bases: src.data_manager.DataSourceBase

DataSource which queries a data catalog made available as a pandas DataFrame, and includes logic for selecting experiment based on column values.

Note

This implementation assumes the catalog is static and locally loaded into memory. (I think) the only source of this limitation is the fact that it uses values of the DataFrame’s Index as its DataKeys, instead of storing the complete row contents, so this limitation could be lifted if needed.

TODO: integrate better with general Intake API.

col_spec = <src.util.basic._AbstractAttributePlaceholder object>
abstract property df

Synonym for the DataFrame containing the catalog.

property all_columns
property remote_data_col
check_group_daterange(group_df, expt_key=None, log=<Logger>)[source]

Sort the files found for each experiment by date, verify that the date ranges contained in the files are contiguous in time and that the date range of the files spans the query date range.

query_dataset(var)[source]

Find all rows of the catalog matching relevant attributes of the DataSource and of the variable (VarlistEntry). Group these by experiments, and for each experiment make the corresponding DataFrameDataKey and store it in var’s data attribute. Specifically, the data attribute is a dict mapping experiments (labeled by experiment_keys) to data found for that variable by this query (labeled by the DataKeys).

get_expt_key(scope, obj, parent_id=None)[source]

Set experiment attributes with case, pod or variable scope. Given obj, construct a DataFrame of epxeriment attributes that are found in the queried data for all variables in obj.

If more than one choice of experiment is possible, call DataSource-specific heuristics in resolve_func to choose between them.

set_expt_key(obj, expt_key)[source]
set_experiment()[source]

Ensure that all data we’re about to fetch comes from the same experiment. If data from multiple experiments was returned by the query that just finished, either employ data source-specific heuristics to select one or return an error.

resolve_expt(expt_df, obj)[source]

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

resolve_pod_expt(expt_df, obj)[source]

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

resolve_var_expt(expt_df, obj)[source]

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

__post_init__()
property active
child_deactivation_handler(child, child_exc)

When a DataKey (child) has been deactivated during query or fetch, log a message on all VarlistEntries using it, and deactivate any VarlistEntries with no remaining viable DataKeys.

child_status_update(exc=None)
close_log_file(log=True)
data_key(value, expt_key=None, status=None)

Constructor for an instance of DataKeyBase that’s used by this DataSource.

deactivate(exc, level=None)
property failed
fetch_data()
abstract fetch_dataset(var, data_key)

Fetches data corresponding to data_key. Populates its local_data attribute with a list of identifiers for successfully fetched data (paths to locally downloaded copies of data).

property full_name
init_extra_log_handlers()
init_log(log_dir, fmt=None)
is_fetch_necessary(d_key, var=None)
iter_children(child_type=None, status=None, status_neq=None)

Generator iterating over child objects associated with this object.

Parameters
  • status – None or ObjectStatus, default None. If None, iterates over all child objects, regardless of status. If a ObjectStatus value is passed, only iterates over child objects with that status.

  • status_neq – None or ObjectStatus, default None. If set, iterates over child objects which don’t have the given status. If status is set, this setting is ignored.

iter_vars(active=None, pod_active=None)

Iterator over all VarlistEntrys (grandchildren) associated with this case. Returns PodVarTuples (namedtuples) of the Diagnostic and VarlistEntry objects corresponding to the POD and its variable, respectively.

Parameters
  • active

    bool or None, default None. Selects subset of VarlistEntrys which are returned in the namedtuples:

    • active = True: only iterate over currently active VarlistEntries.

    • active = False: only iterate over inactive VarlistEntries

      (VarlistEntries which have either failed or are currently unused alternate variables).

    • active = None: iterate over both active and inactive

      VarlistEntries.

  • pod_active – bool or None, default None. Same as active, but filtering the PODs that are selected.

iter_vars_only(active=None)

Convenience wrapper for iter_vars() that returns only the VarlistEntry objects (grandchildren) from all PODs in this DataSource.

name: str = sentinel.Mandatory
post_fetch_hook(vars)

Called after fetching each batch of query results.

post_query_and_fetch_hook()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

post_query_hook(vars)

Called after select_experiment(), after each query of a new batch of variables.

pre_fetch_hook(vars)

Called before fetching each batch of query results.

pre_query_and_fetch_hook()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

pre_query_hook(vars)

Called before querying the presence of a new batch of variables.

preprocess_data()

Hook to run the preprocessing function on all variables.

query_and_fetch_cleanup(signum=None, frame=None)

Called if framework is terminated abnormally. Not called during normal exit.

query_data()
request_data()

Top-level method to iteratively query, fetch and preprocess all data requested by PODs, switching to alternate requested data as needed.

select_data()
setup()
setup_fetch()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

setup_pod(pod)

Update POD with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into Diagnostic’s init, at the cost of dependency inversion.

setup_query()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

setup_var(pod, v)

Update VarlistEntry fields with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into VarlistEntry’s init, at the cost of dependency inversion.

status: src.core.ObjectStatus = 1
tear_down_fetch()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

tear_down_query()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

variable_dest_path(pod, var)

Returns the absolute path of the POD’s preprocessed, local copy of the file containing the requested dataset. Files not following this convention won’t be found by the POD.

class src.data_manager.OnTheFlyFilesystemQueryMixin(*args, **kwargs)[source]

Bases: object

Mixin that creates an intake_esm.esm_datastore catalog by using a regex (_FileRegexClass) to query the existence of data files on a remote filesystem.

For the purposes of this class, all data attributes are inferred only from filea nd directory naming conventions: the contents of the files are not examined (i.e., the data files are not read from) until they are fetched to a local filesystem.

Note

At time of writing, the filename parsing functionality included in intake is too limited to correctly parse our use cases, which is why we use the RegexPattern class instead.

CATALOG_DIR = <src.util.basic._AbstractAttributePlaceholder object>
property df
property remote_data_col

Name of the column in the catalog containing the path to the remote data file.

abstract generate_catalog()[source]

Method (to be implemented by child classes) which returns the data catalog as a Pandas DataFrame. One of the columns of the DataFrame must have the name returned by remote_data_col() and contain paths to the files.

setup_query()[source]

Generate an intake_esm catalog of files found in CATALOG_DIR. Attributes of files listed in the catalog (columns of the DataFrame) are taken from the match groups (fields) of the class’s _FileRegexClass.

__init__()

Initialize self. See help(type(self)) for accurate signature.

class src.data_manager.OnTheFlyDirectoryHierarchyQueryMixin(*args, **kwargs)[source]

Bases: src.data_manager.OnTheFlyFilesystemQueryMixin

Mixin that creates an intake_esm.esm_datastore catalog on-the-fly by crawling a directory hierarchy and populating catalog entry attributes by running a regex (_FileRegexClass) against the paths of files in the directory hierarchy.

iter_files()[source]

Generator that yields instances of _FileRegexClass generated from relative paths of files in CATALOG_DIR. Only paths that match the regex in _FileRegexClass are returned.

generate_catalog()[source]

Crawl the directory hierarchy via iter_files() and return the set of found files as rows in a Pandas DataFrame.

CATALOG_DIR = <src.util.basic._AbstractAttributePlaceholder object>
__init__()

Initialize self. See help(type(self)) for accurate signature.

property df
property remote_data_col

Name of the column in the catalog containing the path to the remote data file.

setup_query()

Generate an intake_esm catalog of files found in CATALOG_DIR. Attributes of files listed in the catalog (columns of the DataFrame) are taken from the match groups (fields) of the class’s _FileRegexClass.

class src.data_manager.FileGlobTuple(name, glob, attrs)

Bases: tuple

Class representing one file glob pattern. attrs is a dict containing the data catalog values that will be associated with all files found using glob. name is used for logging only.

__init__()

Initialize self. See help(type(self)) for accurate signature.

property attrs
property glob
property name
class src.data_manager.OnTheFlyGlobQueryMixin(*args, **kwargs)[source]

Bases: src.data_manager.OnTheFlyFilesystemQueryMixin

Mixin that creates an intake_esm.esm_datastore catalog on-the-fly by searching for files with (python’s implementation of) the shell glob syntax.

We still invoke _FileRegexClass to parse the paths, but the expected use case is that this will be the trivial regex (matching everything, with no labeled match groups), since the file selection logic is being handled by the globs. If you know your data is stored according to some relevant structure, you should use OnTheFlyDirectoryHierarchyQueryMixin instead.

abstract iter_globs()[source]

Iterator returning FileGlobTuple instances. The generated catalog contains the union of the files found by each of the globs.

iter_files(path_glob)[source]

Generator that yields instances of _FileRegexClass generated from relative paths of files in CATALOG_DIR. Only paths that match the regex in _FileRegexClass are returned.

generate_catalog()[source]

Build the catalog from the files returned from the set of globs provided by rel_path_globs().

CATALOG_DIR = <src.util.basic._AbstractAttributePlaceholder object>
__init__()

Initialize self. See help(type(self)) for accurate signature.

property df
property remote_data_col

Name of the column in the catalog containing the path to the remote data file.

setup_query()

Generate an intake_esm catalog of files found in CATALOG_DIR. Attributes of files listed in the catalog (columns of the DataFrame) are taken from the match groups (fields) of the class’s _FileRegexClass.

class src.data_manager.LocalFetchMixin[source]

Bases: src.data_manager.AbstractFetchMixin

Mixin implementing data fetch for files on a locally mounted filesystem. No data is transferred; we assume that xarray can open the paths directly. Paths are unaltered and set as variable’s local_data.

fetch_dataset(var, d_key)[source]

Fetches data corresponding to data_key. Populates its local_data attribute with a list of identifiers for successfully fetched data (paths to locally downloaded copies of data).

__init__()

Initialize self. See help(type(self)) for accurate signature.

post_fetch_hook(vars)

Called after fetching each batch of query results.

pre_fetch_hook(vars)

Called before fetching each batch of query results.

setup_fetch()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

tear_down_fetch()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

class src.data_manager.LocalFileDataSource(*args, **kwargs)[source]

Bases: src.data_manager.OnTheFlyDirectoryHierarchyQueryMixin, src.data_manager.LocalFetchMixin, src.data_manager.DataframeQueryDataSourceBase

DataSource for dealing data in a regular directory hierarchy on a locally mounted filesystem. Assumes data for each variable may be split into several files according to date, with the dates present in their filenames.

CATALOG_DIR = <src.util.basic._AbstractAttributePlaceholder object>
__post_init__()
property active
property all_columns
check_group_daterange(group_df, expt_key=None, log=<Logger>)

Sort the files found for each experiment by date, verify that the date ranges contained in the files are contiguous in time and that the date range of the files spans the query date range.

child_deactivation_handler(child, child_exc)

When a DataKey (child) has been deactivated during query or fetch, log a message on all VarlistEntries using it, and deactivate any VarlistEntries with no remaining viable DataKeys.

child_status_update(exc=None)
close_log_file(log=True)
col_spec = <src.util.basic._AbstractAttributePlaceholder object>
data_key(value, expt_key=None, status=None)

Constructor for an instance of DataKeyBase that’s used by this DataSource.

deactivate(exc, level=None)
property df

Synonym for the DataFrame containing the catalog.

property failed
fetch_data()
fetch_dataset(var, d_key)

Fetches data corresponding to data_key. Populates its local_data attribute with a list of identifiers for successfully fetched data (paths to locally downloaded copies of data).

property full_name
generate_catalog()

Crawl the directory hierarchy via iter_files() and return the set of found files as rows in a Pandas DataFrame.

get_expt_key(scope, obj, parent_id=None)

Set experiment attributes with case, pod or variable scope. Given obj, construct a DataFrame of epxeriment attributes that are found in the queried data for all variables in obj.

If more than one choice of experiment is possible, call DataSource-specific heuristics in resolve_func to choose between them.

init_extra_log_handlers()
init_log(log_dir, fmt=None)
is_fetch_necessary(d_key, var=None)
iter_children(child_type=None, status=None, status_neq=None)

Generator iterating over child objects associated with this object.

Parameters
  • status – None or ObjectStatus, default None. If None, iterates over all child objects, regardless of status. If a ObjectStatus value is passed, only iterates over child objects with that status.

  • status_neq – None or ObjectStatus, default None. If set, iterates over child objects which don’t have the given status. If status is set, this setting is ignored.

iter_files()

Generator that yields instances of _FileRegexClass generated from relative paths of files in CATALOG_DIR. Only paths that match the regex in _FileRegexClass are returned.

iter_vars(active=None, pod_active=None)

Iterator over all VarlistEntrys (grandchildren) associated with this case. Returns PodVarTuples (namedtuples) of the Diagnostic and VarlistEntry objects corresponding to the POD and its variable, respectively.

Parameters
  • active

    bool or None, default None. Selects subset of VarlistEntrys which are returned in the namedtuples:

    • active = True: only iterate over currently active VarlistEntries.

    • active = False: only iterate over inactive VarlistEntries

      (VarlistEntries which have either failed or are currently unused alternate variables).

    • active = None: iterate over both active and inactive

      VarlistEntries.

  • pod_active – bool or None, default None. Same as active, but filtering the PODs that are selected.

iter_vars_only(active=None)

Convenience wrapper for iter_vars() that returns only the VarlistEntry objects (grandchildren) from all PODs in this DataSource.

name: str = sentinel.Mandatory
post_fetch_hook(vars)

Called after fetching each batch of query results.

post_query_and_fetch_hook()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

post_query_hook(vars)

Called after select_experiment(), after each query of a new batch of variables.

pre_fetch_hook(vars)

Called before fetching each batch of query results.

pre_query_and_fetch_hook()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

pre_query_hook(vars)

Called before querying the presence of a new batch of variables.

preprocess_data()

Hook to run the preprocessing function on all variables.

query_and_fetch_cleanup(signum=None, frame=None)

Called if framework is terminated abnormally. Not called during normal exit.

query_data()
query_dataset(var)

Find all rows of the catalog matching relevant attributes of the DataSource and of the variable (VarlistEntry). Group these by experiments, and for each experiment make the corresponding DataFrameDataKey and store it in var’s data attribute. Specifically, the data attribute is a dict mapping experiments (labeled by experiment_keys) to data found for that variable by this query (labeled by the DataKeys).

property remote_data_col

Name of the column in the catalog containing the path to the remote data file.

request_data()

Top-level method to iteratively query, fetch and preprocess all data requested by PODs, switching to alternate requested data as needed.

resolve_expt(expt_df, obj)

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

resolve_pod_expt(expt_df, obj)

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

resolve_var_expt(expt_df, obj)

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

select_data()
set_experiment()

Ensure that all data we’re about to fetch comes from the same experiment. If data from multiple experiments was returned by the query that just finished, either employ data source-specific heuristics to select one or return an error.

set_expt_key(obj, expt_key)
setup()
setup_fetch()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

setup_pod(pod)

Update POD with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into Diagnostic’s init, at the cost of dependency inversion.

setup_query()

Generate an intake_esm catalog of files found in CATALOG_DIR. Attributes of files listed in the catalog (columns of the DataFrame) are taken from the match groups (fields) of the class’s _FileRegexClass.

setup_var(pod, v)

Update VarlistEntry fields with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into VarlistEntry’s init, at the cost of dependency inversion.

status: src.core.ObjectStatus = 1
tear_down_fetch()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

tear_down_query()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

variable_dest_path(pod, var)

Returns the absolute path of the POD’s preprocessed, local copy of the file containing the requested dataset. Files not following this convention won’t be found by the POD.

class src.data_manager.SingleLocalFileDataSource(*args, **kwargs)[source]

Bases: src.data_manager.LocalFileDataSource

DataSource for dealing data in a regular directory hierarchy on a locally mounted filesystem. Assumes all data for each variable (in each experiment) is contained in a single file.

CATALOG_DIR = <src.util.basic._AbstractAttributePlaceholder object>
__post_init__()
property active
property all_columns
check_group_daterange(group_df, expt_key=None, log=<Logger>)

Sort the files found for each experiment by date, verify that the date ranges contained in the files are contiguous in time and that the date range of the files spans the query date range.

child_deactivation_handler(child, child_exc)

When a DataKey (child) has been deactivated during query or fetch, log a message on all VarlistEntries using it, and deactivate any VarlistEntries with no remaining viable DataKeys.

child_status_update(exc=None)
close_log_file(log=True)
col_spec = <src.util.basic._AbstractAttributePlaceholder object>
data_key(value, expt_key=None, status=None)

Constructor for an instance of DataKeyBase that’s used by this DataSource.

deactivate(exc, level=None)
property df

Synonym for the DataFrame containing the catalog.

property failed
fetch_data()
fetch_dataset(var, d_key)

Fetches data corresponding to data_key. Populates its local_data attribute with a list of identifiers for successfully fetched data (paths to locally downloaded copies of data).

property full_name
generate_catalog()

Crawl the directory hierarchy via iter_files() and return the set of found files as rows in a Pandas DataFrame.

get_expt_key(scope, obj, parent_id=None)

Set experiment attributes with case, pod or variable scope. Given obj, construct a DataFrame of epxeriment attributes that are found in the queried data for all variables in obj.

If more than one choice of experiment is possible, call DataSource-specific heuristics in resolve_func to choose between them.

init_extra_log_handlers()
init_log(log_dir, fmt=None)
is_fetch_necessary(d_key, var=None)
iter_children(child_type=None, status=None, status_neq=None)

Generator iterating over child objects associated with this object.

Parameters
  • status – None or ObjectStatus, default None. If None, iterates over all child objects, regardless of status. If a ObjectStatus value is passed, only iterates over child objects with that status.

  • status_neq – None or ObjectStatus, default None. If set, iterates over child objects which don’t have the given status. If status is set, this setting is ignored.

iter_files()

Generator that yields instances of _FileRegexClass generated from relative paths of files in CATALOG_DIR. Only paths that match the regex in _FileRegexClass are returned.

iter_vars(active=None, pod_active=None)

Iterator over all VarlistEntrys (grandchildren) associated with this case. Returns PodVarTuples (namedtuples) of the Diagnostic and VarlistEntry objects corresponding to the POD and its variable, respectively.

Parameters
  • active

    bool or None, default None. Selects subset of VarlistEntrys which are returned in the namedtuples:

    • active = True: only iterate over currently active VarlistEntries.

    • active = False: only iterate over inactive VarlistEntries

      (VarlistEntries which have either failed or are currently unused alternate variables).

    • active = None: iterate over both active and inactive

      VarlistEntries.

  • pod_active – bool or None, default None. Same as active, but filtering the PODs that are selected.

iter_vars_only(active=None)

Convenience wrapper for iter_vars() that returns only the VarlistEntry objects (grandchildren) from all PODs in this DataSource.

name: str = sentinel.Mandatory
post_fetch_hook(vars)

Called after fetching each batch of query results.

post_query_and_fetch_hook()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

post_query_hook(vars)

Called after select_experiment(), after each query of a new batch of variables.

pre_fetch_hook(vars)

Called before fetching each batch of query results.

pre_query_and_fetch_hook()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

pre_query_hook(vars)

Called before querying the presence of a new batch of variables.

preprocess_data()

Hook to run the preprocessing function on all variables.

query_and_fetch_cleanup(signum=None, frame=None)

Called if framework is terminated abnormally. Not called during normal exit.

query_data()
property remote_data_col

Name of the column in the catalog containing the path to the remote data file.

request_data()

Top-level method to iteratively query, fetch and preprocess all data requested by PODs, switching to alternate requested data as needed.

resolve_expt(expt_df, obj)

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

resolve_pod_expt(expt_df, obj)

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

resolve_var_expt(expt_df, obj)

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

select_data()
set_experiment()

Ensure that all data we’re about to fetch comes from the same experiment. If data from multiple experiments was returned by the query that just finished, either employ data source-specific heuristics to select one or return an error.

set_expt_key(obj, expt_key)
setup()
setup_fetch()

Called once, before the iterative request_data() process starts. Use to, eg, initialize database or remote filesystem connections.

setup_pod(pod)

Update POD with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into Diagnostic’s init, at the cost of dependency inversion.

setup_query()

Generate an intake_esm catalog of files found in CATALOG_DIR. Attributes of files listed in the catalog (columns of the DataFrame) are taken from the match groups (fields) of the class’s _FileRegexClass.

setup_var(pod, v)

Update VarlistEntry fields with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into VarlistEntry’s init, at the cost of dependency inversion.

status: src.core.ObjectStatus = 1
tear_down_fetch()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

tear_down_query()

Called once, after the iterative request_data() process ends. Use to, eg, close database or remote filesystem connections.

variable_dest_path(pod, var)

Returns the absolute path of the POD’s preprocessed, local copy of the file containing the requested dataset. Files not following this convention won’t be found by the POD.

query_dataset(var)[source]

Verify that only a single file was found from each experiment.