Source code for drs

#!/usr/bin/env python3
# coding:utf-8

"""CMIP6 Data Reference Syntax (DRS).

(Excerpt from http://goo.gl/v1drZl)

File name template:
-------------------

DRS compilent filename consists of several global attributes as
follows::

    filename = <variable_id>
                _<table_id>
                  _<source_id>
                    _<experiment_id >
                      _<member_id>
                        _<grid_label>
                          [_<time_range>].nc

For time-invariant fields, the last segment (<time_range>) above is
omitted.

All strings appearing in the file name are constructed using only the
following characters: a-z, A-Z, 0-9, and the hyphen ("-"), except the
hyphen must not appear in <variable_id>.  Underscores are prohibited
throughout except as shown in the template.

The <member_id> is constructed from the <sub_experiment_id> and
<variant_label> using the following algorithm::

    if <sub_experiment_id> == "none"
        <member_id> = <variant_label>
    else
        <member_id> = <sub_experiment_id>-<variant_label>
    endif

The <time_range> is a string generated consistent with the following::

    if frequency == "fx" then
        <time_range>=""
    else
        <time_range> = N1-N2
    endif

where N1 and N2 are integers of the form ``yyyy[MM[dd[hh[mm[ss]]]]][<suffix>]``
(expressed as a string, where ``yyyy``, ``MM``, ``dd``, ``hh``, ``mm`` and
``ss`` are integer year, month, day, hour, minute, and second, respectively),
where <suffix> is defined as follows::

    if the variable identified by variable_id has a time dimension with a “climatology” attribute then
         <suffix> = "-clim"
    else
         <suffix> = ""
    endif

and where the precision of the time_range strings is determined by the
<frequency> global attribute.

Example when there is no <sub_experiment_id>::

    tas_Amon_GFDL-CM4_historical_r1i1p1f1_gn_196001-199912.nc

Example with a <sub_experiment_id>::

    pr_day_CNRM-CM6-1_dcppA-hindcast_s1960-r2i1p1f1_gn_198001-198412.nc



Directory structure template:
-----------------------------

DRS complient directory structure consists of several global
attributes as follows::

    Directory structure = <mip_era>/
                            <activity_id>/
                              <institution_id>/
                                <source_id>/
                                  <experiment_id>/
                                    <member_id>/
                                      <table_id>/
                                        <variable_id>/
                                          <grid_label>/
                                            <version>


Note:

- <version> has the form "vYYYYMMDD" (e.g., "v20160314"), indicating a
  representative date for the version.  Note that files contained in a
  single <version> subdirectory at the end of the directory path
  should represent all the available time-samples reported from the
  simulation; a time-series can be split across several files, but all
  the files must be found in the same subdirectory.  This implies that
  <version> will not generally be the actual date that all files in
  the subdirectory were written or published.
- If multiple activities are listed in the global attribute, the first
  one is used in the directory structure.


Example when there is no <sub-experiment_id>::

     CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/1pctCO2/r1i1p1f1/Amon/tas/gn/v20150322

Example with a <sub_experiment_id>::

     CMIP6/DCPP/CNRM-CERFACS/CNRM-CM6-1/dcppA-hindcast/s1960-r2i1p1f3/day/pr/gn/v20160215

"""
__author__ = 'T.Inoue'
__credits__ = 'Copyright (c) 2019 RIST'
__version__ = 'v20190611'
__date__ = '2019/06/11'

from cmiputil.convoc import ConVoc
from cmiputil.braceexpand import braceexpand
import netCDF4 as nc
from pathlib import Path
import re
import glob
# from pprint import pprint


# def getDefaultConf():
#     """
#     Set default values for config file.

#     Currently no need to use config file, do nothing.
#     """
#     pass


[docs]class DRS: """Class for CMIP6 DRS. This class contains attributes necessary to construct a file name/directory name that is valid for CMIP6 DRS (Data Reference Syntax). See above and http://goo.gl/v1drZl for details about DRS as well as CMIP6 global attributes, etc. Instance member variables of this class are: - ``activity_id`` - ``experiment_id`` - ``grid_label`` - ``institution_id`` - ``mip_era`` - ``source_id`` - ``source_id`` - ``sub_experiment_id`` - ``table_id`` - ``time_range`` - ``variable_id`` - ``variant_label`` - ``version`` - ``member_id`` Note that ``member_id`` is not able to set directly, this is constructed by ``sub_experiment_id`` (omittable) and ``variant_label``, via decorated method :meth:`member_id`. You can use the class member :attr:`requiredAttribs`, :attr:`filenameAttribs`, :attr:`filenameAttribsOptional`, :attr:`dirnameAttribs` to know necessary attributes to set this class and a filename/dirname valid for DRS. Note: Attributes as the class member, - ``hasattr(self, a) is False`` : not set explicitly - ``self.a == None`` : not set explicitly - ``self.a == '*'`` : set as is <- not implemented yet - ``type(self.a) == list`` : multiple values for brace expansion. """ #: Attributes managed in this class. requiredAttribs = ( 'activity_id', 'experiment_id', 'grid_label', 'institution_id', 'mip_era', 'source_id', 'sub_experiment_id', 'table_id', 'time_range', 'variable_id', 'variant_label', 'version', ) #: Attributes necessary to construct dirname. dirnameAttribs = ( "mip_era", "activity_id", "institution_id", "source_id", "experiment_id", "member_id", "table_id", "variable_id", "grid_label", "version") #: Attributes necessary to construct filename. filenameAttribs = ( "variable_id", "table_id", "source_id", "experiment_id", "member_id", "grid_label", ) #: Attributes optional to construct filename. filenameAttribsOptional = ( "time_range", ) #: list of <experiment_id> that have sub_experiment _experiments_w_sub = None #: ConVoc instance _cvs = None _debug = False @classmethod def _enable_debug(cls): cls._debug = True @classmethod def _disable_debug(cls): cls._debug = True # @property # def debug(cls): # return cls._debug def __init__(self, file=None, filename=None, dirname=None, do_sanitize=True, **kw): """ Args: file(path-like): CMIP6 netCDF file. filename(str): filename to be used to set attributes. dirname(str): dirname to be used to set attributes. kw(dict): attribute-value pairs do_sanitize(bool): do sanitize or not If `file` is given, it must be a valid CMIP6 netCDF file, and attributes in that file are read and set. Else if `filename` is given, it must be a valid filename as DRS, and attributes are set from components consist of that name. Else if `dirname` is given, it must be a valid directory name as DRS, and attributes are set from components consist of that name. Else attributes are set from `**kw` dict. If `do_sanitize` is ``True``, remove invalid attribute values, else set as-is. You can sanitize *after* via :meth:`doSanitize`. Examples: >>> drs.DRS(filename='tas_Amon_MIROC6_piControl_r1i1p1f1_gn_320001-329912.nc') DRS(experiment_id='piControl', grid_label='gn', mip_era='CMIP6', source_id='MIROC6', table_id='Amon', time_range='320001-329912', variable_id='tas', variant_label='r1i1p1f1') >>> drs.DRS(dirname='/data/CMIP6/CMIP/MIROC/MIROC6/piControl/r1i1p1f1/Amon/tas/gn/v20181212/') DRS(activity_id='CMIP', experiment_id='piControl', grid_label='gn', institution_id='MIROC', mip_era='CMIP6', source_id='MIROC6', table_id='Amon', variable_id='tas', variant_label='r1i1p1f1', version='v20181212') Do or not sanitize; >>> attrs = {k:v for k,v in drs.sample_attrs.items()} >>> attrs['table_id'] = 'INVALID' >>> d = drs.DRS(**attrs) >>> d.table_id Traceback (most recent call last): ... AttributeError: 'DRS' object has no attribute 'table_id' >>> d = drs.DRS(**attrs, do_sanitize=False) >>> d.table_id 'INVALID' """ if (not self.__class__._cvs): self.__class__._cvs = ConVoc() self.mip_era = 'CMIP6' if (file): attrs = self.getAttrsFromGA(file) elif (filename): attrs = self.splitFileName(filename) elif (dirname): attrs = self.splitDirName(dirname) else: attrs = kw self.set(do_sanitize=do_sanitize, **attrs) def __repr__(self): # Since now attributes of self allow to be a list, but at # setter values must be a comma-separated str, must convert # list to str. res = [] for a in self.requiredAttribs: if hasattr(self, a): v = getattr(self, a) if (type(v) is list): v = ', '.join(v) res.append("{}='{}'".format(a, v)) res = (self.__class__.__name__ + '(' + ', '.join(res) + ')') return res def __str__(self): res = ["{}: {!a}".format(k, getattr(self, k)) for k in self.requiredAttribs if hasattr(self, k)] res = "\n".join(res) return res def __eq__(self, other): typecheck = (type(self) == type(other)) res = [(getattr(self, k) == getattr(other, k)) for k in self.requiredAttribs if hasattr(self, k)] return (all(res) and typecheck)
[docs] def getAttrsFromGA(self, file): """ Obtain requiered attributes from the global attributes defined in a valid netCDF file. Args: file(str or path-like?): filename of a valid netCDF file. Returns: dict: whose keys are from :attr:`DRS.requiredAttribs`. """ with nc.Dataset(file, "r") as ds: attrs = {a: getattr(ds, a, None) for a in self.requiredAttribs} attrs = {a: v for a, v in attrs.items() if (v and v != 'none')} return attrs
[docs] def set(self, do_sanitize=True, **argv): """ Set instance attributes, if attribute is in :attr:`requiredAttribs`. In `argv`, - missing attributes are left unset/untouched, - attribute with invalid value is sanitized via :meth:`doSanitize` if ``do_sanitize=True``, - unnecessary attributes are neglected. Each of attributes are checked by :meth:`isValidValueForAttr()` before set. Args: argv (dict): attribute/value pairs do_sanitize(bool): remove invalid values via :meth:`doSanitize` Return: nothing Examples: >>> d = drs.DRS(**drs.sample_attrs) >>> d DRS(activity_id='CMIP', experiment_id='piControl', grid_label='gn', institution_id='MIROC', mip_era='CMIP6', source_id='MIROC6', table_id='Amon', time_range='320001-329912', variable_id='tas', variant_label='r1i1p1f1', version='v20181212') >>> d.set(experiment_id='amip') >>> d.experiment_id 'amip' >>> d.set(experiment_id='invalid_experiment') >>> d.experiment_id Traceback (most recent call last): ... AttributeError: 'DRS' object has no attribute 'experiment_id' In the last example, invalid value for `experiment_id` is sanitized since ``do_sanitize=True`` by default. """ attribs = {a: argv[a] for a in argv.keys() if a in self.requiredAttribs} for a, v in attribs.items(): v = [vv.strip() for vv in v.split(',')] if len(v) > 1: setattr(self, a, v) else: setattr(self, a, v[0]) # self.set_member_id() if (do_sanitize): self.doSanitize()
[docs] def fileName(self, prefix=None, w_time_range=True, allow_asterisk=True): """ Construct filename from current instance member attributes. Args: prefix(path-like): prepend to the resulting filename w_time_range(bool): result contains <time_range> part or not allow_asterisk(bool): allow result contains ``*`` Raises: AttributeError: any attributes are missing. Returns: path-like: filename Note: By definition, including <time_range> part or not is decided by the attribute <frequency> is 'fx' or not. <frequency> is the same with the attribute <table_id>, so in this method if ``self.table_id == 'fx'`` force `w_time_range` to be ``True``. If ``self.table_id = '*'`` or set multi values and you want force <time_range> part to be omitted, set ``w_time_range=False`` explicitly. Examples: Usual case; >>> str(drs.DRS(**drs.sample_attrs).fileName()) 'tas_Amon_MIROC6_piControl_r1i1p1f1_gn_320001-329912.nc' With ``sub_experiment_id``; >>> str(drs.DRS(**drs.sample_attrs_w_subexp).fileName()) 'rsdscs_Amon_IPSL-CM6A-LR_dcppC-atl-pacemaker_s1950-r1i1p1f1_gr_192001-201412.nc' No ``time_range``; >>> str(drs.DRS(**drs.sample_attrs_no_time_range).fileName(w_time_range=False)) 'areacella_fx_MIROC6_historical_r1i1p1f1_gn.nc' With prefix; >>> prefix=Path('/data/CMIP6/') >>> str(drs.DRS(**drs.sample_attrs).fileName(prefix)) '/data/CMIP6/tas_Amon_MIROC6_piControl_r1i1p1f1_gn_320001-329912.nc' Invalid value for valid attribute; >>> attrs = {k: v for k, v in drs.sample_attrs.items()} >>> attrs.update({'table_id': 'invalid'}) >>> str(drs.DRS(**attrs).fileName()) 'tas_*_MIROC6_piControl_r1i1p1f1_gn_320001-329912.nc' >>> str(drs.DRS(**attrs).fileName(allow_asterisk=False)) Traceback (most recent call last): ... AttributeError: 'DRS' object has no attribute 'table_id' Missing attributes; >>> attrs = {k: v for k, v in drs.sample_attrs.items()} >>> del attrs['time_range'] >>> str(drs.DRS(**attrs).fileName()) 'tas_Amon_MIROC6_piControl_r1i1p1f1_gn_*.nc' >>> str(drs.DRS(**attrs).fileName(allow_asterisk=False)) Traceback (most recent call last): ... AttributeError: 'DRS' object has no attribute 'time_range' Allow multi values; >>> attrs = {k: v for k, v in drs.sample_attrs.items()} >>> attrs.update({'experiment_id':'amip, piControl'}) >>> str(drs.DRS(**attrs).fileName()) 'tas_Amon_MIROC6_{amip,piControl}_r1i1p1f1_gn_320001-329912.nc' """ attr = {} for a in self.filenameAttribs + self.filenameAttribsOptional: try: v = getattr(self, a) except AttributeError: if (allow_asterisk): v = '*' else: raise if type(v) is list: v = '{'+','.join(v)+'}' attr[a] = v if (attr['table_id'] == 'fx'): w_time_range = False if w_time_range: f = ("{variable_id}_{table_id}_{source_id}_{experiment_id}" "_{member_id}_{grid_label}_{time_range}.nc").format(**attr) else: f = ("{variable_id}_{table_id}_{source_id}_{experiment_id}" "_{member_id}_{grid_label}.nc").format(**attr) f = Path(f) if (prefix): f = Path(prefix) / f return f
[docs] def fileNameList(self, prefix=None): """ Returns a list of filenames constructed by the instance member attributes that may contains '*' and/or braces. Returns: list of str: filenames Examples: >>> attrs = {k: v for k, v in drs.sample_attrs.items()} >>> attrs.update({'experiment_id':'amip, piControl'}) >>> del attrs['time_range'] >>> str(drs.DRS(**attrs).fileName()) 'tas_Amon_MIROC6_{amip,piControl}_r1i1p1f1_gn_*.nc' >>> dlist = drs.DRS(**attrs).fileNameList() # doctest: +SKIP >>> [str(d) for d in dlist] # doctest: +SKIP ['tas_Amon_MIROC6_amip_r1i1p1f1_gn_*.nc', 'tas_Amon_MIROC6_piControl_r1i1p1f1_gn_*.nc'] The last example will return ``[]`` if expanded files do not exist. """ fname = self.fileName(prefix=prefix) flist = [glob.glob(p) for p in braceexpand(str(fname))] return [f for ff in flist for f in ff]
[docs] def dirName(self, prefix=None, allow_asterisk=True): """ Construct directory name by DRS from :class:`DRS` instance members. If `allow_asterisk` is ``True``, invalid If you want glob/brace expaned list, use :meth:`dirNameList` instead. Args: prefix (Path-like): prepend to the result path. allow_asterisk: allow result contains ``*`` or not. Raises: AttributeError: any attributes are missing or invalid and ``allow_asterisk=False`` Returns: Path-like : directory name Examples: Usual case; >>> str(drs.DRS(**drs.sample_attrs).dirName()) 'CMIP6/CMIP/MIROC/MIROC6/piControl/r1i1p1f1/Amon/tas/gn/v20181212' With ``sub_experiment_id``; >>> str(drs.DRS(**drs.sample_attrs_w_subexp).dirName()) 'CMIP6/DCPP/IPSL/IPSL-CM6A-LR/dcppC-atl-pacemaker/s1950-r1i1p1f1/Amon/rsdscs/gr/v20190110' Invalid value for valid attribute; >>> attrs = {k:v for k,v in drs.sample_attrs.items()} >>> attrs['table_id'] = 'invalid' >>> str(drs.DRS(**attrs).dirName()) 'CMIP6/CMIP/MIROC/MIROC6/piControl/r1i1p1f1/*/tas/gn/v20181212' >>> str(drs.DRS(**attrs).dirName(allow_asterisk=False)) Traceback (most recent call last): ... AttributeError: 'DRS' object has no attribute 'table_id' Missing attributes; >>> attrs = {k:v for k,v in drs.sample_attrs.items()} >>> del attrs['experiment_id'] >>> str(drs.DRS(**attrs).dirName(prefix='/data/')) '/data/CMIP6/CMIP/MIROC/MIROC6/*/r1i1p1f1/Amon/tas/gn/v20181212' >>> str(drs.DRS(**attrs).dirName(prefix='/data/', allow_asterisk=False)) Traceback (most recent call last): ... AttributeError: 'DRS' object has no attribute 'experiment_id' Allow multi values; >>> attrs = {k: v for k, v in drs.sample_attrs.items()} >>> attrs.update({'experiment_id':'amip, piControl'}) >>> str(drs.DRS(**attrs).dirName()) 'CMIP6/CMIP/MIROC/MIROC6/{amip,piControl}/r1i1p1f1/Amon/tas/gn/v20181212' """ attr = {} for a in self.dirnameAttribs: try: v = getattr(self, a) except AttributeError: if allow_asterisk: v = '*' else: raise if type(v) is list: v = '{'+','.join(v)+'}' attr[a] = v d = Path( attr["mip_era"], attr["activity_id"], attr["institution_id"], attr["source_id"], attr["experiment_id"], attr["member_id"], attr["table_id"], attr["variable_id"], attr["grid_label"], attr["version"]) if (prefix): d = Path(prefix) / d return d
[docs] def dirNameList(self, prefix=None): """ Return list of directory name constructed by DRS from :class:`DRS` instance members, that contains asterisk and/or braces Args: prefix(path-like): dirname to prepend. Returns: list of path-like: directory names Note: Non-existent directories are omitted. Examples: >>> attrs = {k: v for k, v in drs.sample_attrs.items()} >>> attrs.update({'experiment_id':'amip, piControl'}) >>> del attrs['version'] >>> str(drs.DRS(**attrs).dirName()) 'CMIP6/CMIP/MIROC/MIROC6/{amip,piControl}/r1i1p1f1/Amon/tas/gn/*' >>> res = drs.DRS(**attrs).dirNameList(prefix='/data') >>> ref = [Path('/data/CMIP6/CMIP/MIROC/MIROC6/amip/r1i1p1f1/Amon/tas/gn/v20181214'), ... Path('/data/CMIP6/CMIP/MIROC/MIROC6/piControl/r1i1p1f1/Amon/tas/gn/v20181212')] >>> print(ref == res) True The last example will return ``[]`` if expanded directories do not exist. """ dname = self.dirName(prefix=prefix) # may contain '*' and braces plist = [glob.iglob(p) for p in braceexpand(str(dname))] return [Path(p) for pp in plist for p in pp]
[docs] def splitFileName(self, fname, validate=False): """Split filename to attributes for DRS. If ``varidate=False``, just split only. So if the `fname` consist of the same number of components with DRS-valid filename, no error happens. You should set `validate=True` or use :meth:`isValidValueForAttr` by yourself. Args: fname (Path-like) : filename validate(bool) : validate the resulting attribute/value pair Raises: ValueError: if `fname` is invalid for DRS. Returns: dict: attribute and it's value Note: Instance members keep untouched, give :meth:`set` the result of this method. Examples: >>> fname = "tas_Amon_MIROC6_piControl_r1i1p1f1_gn_320001-329912.nc" >>> drs.DRS().splitFileName(fname) {'experiment_id': 'piControl', 'grid_label': 'gn', 'source_id': 'MIROC6', 'table_id': 'Amon', 'time_range': '320001-329912', 'variable_id': 'tas', 'variant_label': 'r1i1p1f1'} >>> fname='invalid_very_long_file_name.nc' >>> drs.DRS().splitFileName(fname) Traceback (most recent call last): ... ValueError: not follow the name template: "invalid_very_long_file_name.nc" >>> fname='invalid_but_same_length_with_drs.nc' >>> drs.DRS().splitFileName(fname) {'experiment_id': 'length', 'grid_label': 'drs', 'source_id': 'same', 'table_id': 'but', 'variable_id': 'invalid', 'variant_label': 'with'} >>> drs.DRS().splitFileName(fname, validate=True) Traceback (most recent call last): ... ValueError: "length" is invalid for <experiment_id> """ try: (variable_id, table_id, source_id, experiment_id, member_id, grid_label) = Path(fname).stem.split('_', 5) except ValueError: raise ValueError(f'not follow the name template: "{fname}"') try: (grid_label, time_range) = grid_label.split('_') except ValueError: # time_range = None pass try: (sub_experiment_id, variant_label) = member_id.split('-') except ValueError: variant_label = member_id # sub_experiment_id = None res = {} for a in self.requiredAttribs: try: res[a] = eval(a) except NameError: pass if validate: for a, v in res.items(): if not self.isValidValueForAttr(v, a): raise ValueError(f'"{v}" is invalid for <{a}>') return res
[docs] def splitDirName(self, dname, validate=False): """Split dirname to attributes for DRS. If ``varidate=False``, just split only. So if the `dname` consist of the same number of components with DRS-valid directory name, no error happens. You should set `validate=True` or use :meth:`isValidValueForAttr` by yourself. Args: dname (path-like) : directory name validate(bool) : validate the resulting attribute/value pair Returns: dict: attribute and it's value Note: Instance members keep untouched, give :meth:`set` the result of this method. Examples: >>> dname = 'CMIP6/CMIP/MIROC/MIROC6/piControl/r1i1p1f1/Amon/tas/gn/v20181212' >>> drs.DRS().splitDirName(dname) {'activity_id': 'CMIP', 'experiment_id': 'piControl', 'grid_label': 'gn', 'institution_id': 'MIROC', 'mip_era': 'CMIP6', 'source_id': 'MIROC6', 'table_id': 'Amon', 'variable_id': 'tas', 'variant_label': 'r1i1p1f1', 'version': 'v20181212', 'prefix': ''} With `prefix`; >>> dname = ('/work/data/CMIP6/CMIP6/CMIP/MIROC/MIROC6/piControl/r1i1p1f1/Amon/tas/gn/v20181212') >>> drs.DRS().splitDirName(dname) {'activity_id': 'CMIP', 'experiment_id': 'piControl', 'grid_label': 'gn', 'institution_id': 'MIROC', 'mip_era': 'CMIP6', 'source_id': 'MIROC6', 'table_id': 'Amon', 'variable_id': 'tas', 'variant_label': 'r1i1p1f1', 'version': 'v20181212', 'prefix': '/work/data/CMIP6'} Invalid case; >>> dname = 'Some/Invalid/Path' >>> drs.DRS().splitDirName(dname) Traceback (most recent call last): ... ValueError: Invalid dirname: "Some/Invalid/Path" >>> dname = 'Some/Invalid/but/has/occasionally/the/same/number/of/component/' >>> drs.DRS().splitDirName(dname) {'activity_id': 'Invalid', 'experiment_id': 'occasionally', 'grid_label': 'of', 'institution_id': 'but', 'mip_era': 'Some', 'source_id': 'has', 'table_id': 'same', 'variable_id': 'number', 'variant_label': 'the', 'version': 'component', 'prefix': ''} >>> drs.DRS().splitDirName(dname, validate=True) Traceback (most recent call last): ... ValueError: "Invalid" is invalid for <activity_id> """ res = {} d = Path(dname) try: (version, grid_label, variable_id, table_id, member_id, experiment_id, source_id, institution_id, activity_id, mip_era) = d.parts[-1:-11:-1] except ValueError: raise ValueError(f'Invalid dirname: "{dname}"') try: (sub_experiment_id, variant_label) = member_id.split('-') except ValueError: variant_label = member_id for k in self.requiredAttribs: try: res[k] = eval(k) except NameError: pass if validate: for a, v in res.items(): if not self.isValidValueForAttr(v, a): raise ValueError(f'"{v}" is invalid for <{a}>') if (len(d.parts) > 10): res["prefix"] = str(Path(*d.parts[:-10])) else: res["prefix"] = '' return res
[docs] def isValidPath(self, path, directory=False, separated=False): """ Check if given `path` is DRS compliant. `path` may be a URL obtained by ESGF Search function. See :mod:`cmiputil.esgfsearch` for details. Args: path (Path-like) : pathname to be checked directory (bool) : treat `path` is a directory separated (bool) : return a tuple of two dicts Returns: bool or list of bool : valid or not (see below) If `separate` is True, return a tuple of two dicts, first element is for the filename, second is for the directory name, both dicts' key/value shows that each attributes are valid or not. If `directory` is ``True``, first elements is ``{'all': True}``. Examples: >>> ourl = ('http://vesg.ipsl.upmc.fr/thredds/fileServer/cmip6/DCPP/' ... 'IPSL/IPSL-CM6A-LR/dcppC-pac-pacemaker/s1920-r1i1p1f1/' ... 'Amon/rsdscs/gr/v20190110/rsdscs_Amon_IPSL-CM6A-LR_' ... 'dcppC-pac-pacemaker_s1920-r1i1p1f1_gr_192001-201412.nc') >>> drs.DRS().isValidPath(url) True >>> drs.DRS().isValidPath(url, separated=True) ({'experiment_id': True, 'grid_label': True, 'source_id': True, 'sub_experiment_id': True, 'table_id': True, 'time_range': True, 'variable_id': True, 'variant_label': True}, {'activity_id': True, 'experiment_id': True, 'grid_label': True, 'institution_id': True, 'mip_era': True, 'source_id': True, 'sub_experiment_id': True, 'table_id': True, 'variable_id': True, 'variant_label': True, 'version': True}) >>> url = ('http://vesg.ipsl.upmc.fr/thredds/fileServer/cmip6/DCPP/' ... 'IPSL/IPSL-CM6A-LR/dcppC-pac-pacemaker/s1920-r1i1p1f1/' ... 'Amon/rsdscs/gr/v20190110') >>> drs.DRS().isValidPath(url) False >>> drs.DRS().isValidPath(url, directory=True) True """ p = Path(path) if (directory): fname = None dname = p else: fname = p.name dname = p.parent if (fname): try: f_attr = self.splitFileName(fname) except ValueError: f_res = {'all': False} else: f_res = {a: self.isValidValueForAttr(f_attr[a], a) for a in f_attr if a in self.requiredAttribs} else: f_res = {'all': True} if (dname != Path('.')): try: d_attr = self.splitDirName(dname) except ValueError: d_res = {'all': False} else: d_res = {a: self.isValidValueForAttr(d_attr[a], a) for a in d_attr if a in self.requiredAttribs} else: d_res = {'all': True} if separated: return f_res, d_res else: return all(f_res.values()) and all(d_res.values())
[docs] def isValid(self, silent=True): """ Check if attributes are valid as DRS. Args: silent(bool): no message even if something is invalid. Return: bool: all attributes are valid or not. Examples: >>> d = drs.DRS(**drs.sample_attrs) >>> d.isValid() True >>> d.activity_id = 'InvalidMIP' >>> d.isValid() False """ return self._validate(silent=silent, delete_invalid=False)
[docs] def isValidValueForAttr(self, value, attr): """ Check `value` is valid for the attribute `attr`. Args: value (object) : value for `attr` attr (object) : global attribute Raises: AttributeError: raises when `attr` is invalid for DRS. Returns: bool: whether `value` is valid for the attribute `attr` Examples: >>> d = drs.DRS() >>> d.isValidValueForAttr('Amon', 'table_id') True >>> d.isValidValueForAttr('Invalid', 'source_id') False >>> d.isValidValueForAttr('piControl', 'experiment_id') True >>> d.isValidValueForAttr('piControl', 'experiments_id') Traceback (most recent call last): ... AttributeError: ('Invalid Attribute for DRS:', 'experiments_id') >>> d.isValidValueForAttr('*', 'institution_id') True >>> d.isValidValueForAttr('MIROC*', 'source_id') True """ if value is None: return False if '*' in value: return True if attr == 'sub_experiment_id': return (self._cvs.isValidValueForAttr(value, attr)) elif attr in self._cvs.managedAttribs: return self._cvs.isValidValueForAttr(value, attr) elif attr == 'time_range': return self._check_time_range(value) elif attr == 'version': return self._check_version(value) elif attr == 'variable_id': return self._check_variable_id(value) elif attr == 'variant_label': return self._check_variant_label(value) elif attr == 'mip_era': return (value.lower() == 'cmip6') else: raise AttributeError('Invalid Attribute for DRS:', attr)
[docs] def getAttribs(self): """ Return current instance attributes defined of :attr:`requiredAttribs` and their values. Returns: dict: attribute-value pairs. """ return {k: getattr(self, k) for k in self.requiredAttribs if hasattr(self, k)}
@property def member_id(self): """Getter for the attribute <member_id>. See the definition of this attribute in :mod:`cmiputil.drs`. """ if not self.__class__._experiments_w_sub: exps = self._cvs.getAttrib('experiment_id') self.__class__._experiments_w_sub = [ e for e in exps.keys() if exps[e]['sub_experiment_id'][0] != 'none'] if (hasattr(self, 'variant_label')): if (hasattr(self, 'sub_experiment_id') and self.sub_experiment_id != 'none'): subexp = self.sub_experiment_id varlab = self.variant_label res = f"{subexp}-{varlab}" else: res = self.variant_label return res else: return '*'
[docs] def doSanitize(self, silent=True): """ Sanitize instances. That is, remove invalid values for valid attributes. Args: silent(bool): do it silently or not Returns: nothing Examples: >>> d = drs.DRS(**drs.sample_attrs) >>> d.activity_id = 'InvalidMIP' >>> hasattr(d, 'activity_id') True >>> d.doSanitize() >>> hasattr(d, 'activity_id') False For above case, You should use ``d.set(activity_id='...')`` instead of setting an attribute directly. See :meth:`set`. """ self._validate(silent=silent, delete_invalid=True)
def _validate(self, silent=False, delete_invalid=False): fmt = 'Warining: <{}> has invalid value "{}".' res = {} for a in self.requiredAttribs: v = getattr(self, a, None) if v is None: pass elif type(v) is list: # res[a] = True # tentative vals = {vv: self.isValidValueForAttr(vv, a) for vv in v} res[a] = all(vals.values()) if (not res[a]): if (not silent): print(fmt.format(a, [vv for vv in v if not vals[vv]])) if (delete_invalid): setattr(self, a, [vv for vv in v if vals[vv]]) else: if self.isValidValueForAttr(v, a): res[a] = True else: res[a] = False if (not silent): print(fmt.format(a, v)) if (delete_invalid): delattr(self, a) return all(res.values()) def _check_time_range(self, value): # TODO: precision and `-clim` depends on the attribute `frequency`. # but I don't need quality assurance. if (value is None): return False elif (value == ""): return False # pat = re.compile(r'\d{4,8}(-clim)?-\d{4,8}(-clim)?') pat = re.compile(r'\d{4}(\d\d(\d\d(\d\d(\d\d(\d\d)?)?)?)?)?' r'(-clim)?' r'-' r'\d{4}(\d\d(\d\d(\d\d(\d\d(\d\d)?)?)?)?)?' r'(-clim)?' ) return pat.fullmatch(value) is not None def _check_version(self, value): if (value is None): return False pat = re.compile(r'v\d{8}') return pat.fullmatch(value) is not None def _check_variable_id(self, value): # TODO: Is there any method to check ? return value is not None def _check_variant_label(self, value): if (value is None): return False pat = re.compile(r'r\d+i\d+p\d+f\d+') return pat.fullmatch(value) is not None
sample_attrs = { 'activity_id': 'CMIP', 'experiment_id': 'piControl', 'grid_label': 'gn', 'institution_id': 'MIROC', 'source_id': 'MIROC6', 'table_id': 'Amon', 'time_range': '320001-329912', 'variable_id': 'tas', 'variant_label': 'r1i1p1f1', 'version': 'v20181212', 'non_necessary_attribute': 'hoge' } sample_fname = "tas_Amon_MIROC6_piControl_r1i1p1f1_gn_320001-329912.nc" sample_dname = ("CMIP6/CMIP/MIROC/MIROC6/piControl/" "r1i1p1f1/Amon/tas/gn/v20181212") sample_attrs_w_subexp = { 'activity_id': 'DCPP', 'experiment_id': 'dcppC-atl-pacemaker', 'grid_label': 'gr', 'institution_id': 'IPSL', 'source_id': 'IPSL-CM6A-LR', 'sub_experiment_id': 's1950', 'table_id': 'Amon', 'time_range': '192001-201412', 'variable_id': 'rsdscs', 'variant_label': 'r1i1p1f1', 'version': 'v20190110'} sample_fname_w_subexp = ('rsdscs_Amon_IPSL-CM6A-LR_dcppC-atl-pacemaker' '_s1950-r1i1p1f1_gr_192001-201412.nc') sample_dname_w_subexp = ('CMIP6/DCPP/IPSL/IPSL-CM6A-LR/dcppC-atl-pacemaker/' 's1950-r1i1p1f1/Amon/rsdscs/gr/v20190110') sample_attrs_no_time_range = { 'activity_id': 'CMIP', 'experiment_id': 'historical', 'grid_label': 'gn', 'institution_id': 'MIROC', 'mip_era': 'CMIP6', 'prefix': '', 'source_id': 'MIROC6', 'table_id': 'fx', 'variable_id': 'areacella', 'variant_label': 'r1i1p1f1', 'version': 'v20190311'} sample_fname_no_time_range = 'areacella_fx_MIROC6_historical_r1i1p1f1_gn.nc' sample_dname_no_time_range = ('CMIP6/CMIP/MIROC/MIROC6/historical/r1i1p1f1/' 'fx/areacella/gn/v20190311/') if __name__ == "__main__": from cmiputil import drs import doctest doctest.testmod()