Source code for dds

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Module to parse DDS (Dataset Descriptor Structure) used in OPeNDAP.

DDS
---

For the definition of DDS, see `OpenDAP UserGuide`_.
In this module, we change the notation in the DDS syntax as follows:

    | *declarations* := list(*declaration*)
    | *declaration* := *Var* | *Struct*
    | *Struct* := *stype* { *declarations* } (*name* | *name* *arr*)
    | *stype* := Dataset|Structure|Sequence|Grid
    | *Grid* := Grid { ARRAY: *declaration* MAPS: *declarations* } (*name* | *name* *arr*)
    | *Var* := *btype* (*name* | *name* *arr*)
    | *btype* := Byte|Int32|UInt32|Float64|String|Url| ...
    | *arr* := [integer] | [*name* = integer]

As you can see from above syntax, one *Struct* can contain other *Struct* recursively, and consists
the tree structure. The root of the tree must be one "Dataset".

In this module, each element of above syntax is implemented as one class.

Basic Usage
-----------

Text form of DDS will be obtained by, for example,
:meth:`.ESGFDataInfo.getDDS`. Use :func:`parse_dataset` to parse it to
get the tree structure. The root of the tree is a :class:`Dataset`
instance, and you can access nodes and leafs of the tree by dot
notation (see also 'Example' section below)::

    ds = parse_dataset(text=sample1)
    ds.tas  # Grid('tas, arrary=Var(tas, ...), maps={'time':..., 'lat':..., 'lon':...})
    ds.tas.array.arr[0]  # Arr('time', 8412)


.. _OpenDAP UserGuide: https://opendap.github.io/documentation/UserGuideComprehensive.html#DDS

Example:

    >>> sample1 = '''
    ... Dataset {
    ...     Float64 lat[lat = 160];
    ...     Float64 lat_bnds[lat = 160][bnds = 2];
    ...     Float64 lon[lon = 320];
    ...     Float64 lon_bnds[lon = 320][bnds = 2];
    ...     Float64 height;
    ...     Float64 time[time = 8412];
    ...     Float64 time_bnds[time = 8412][bnds = 2];
    ...     Grid {
    ...      ARRAY:
    ...         Float32 tas[time = 8412][lat = 160][lon = 320];
    ...      MAPS:
    ...         Float64 time[time = 8412];
    ...         Float64 lat[lat = 160];
    ...         Float64 lon[lon = 320];
    ...     } tas;
    ... } CMIP6.CMIP.MRI.MRI-ESM2-0.piControl.r1i1p1f1.Amon.tas.gn.tas.20190222.aggregation.1;'''
    >>> sample1_struct = Dataset(
    ...    'CMIP6.CMIP.MRI.MRI-ESM2-0.piControl.r1i1p1f1.Amon.tas.gn.tas.20190222.aggregation.1',
    ...    {
    ...        'lat':
    ...        Var('lat', 'Float64', arr=[Arr('lat', 160)]),
    ...        'lat_bnds':
    ...        Var('lat_bnds', 'Float64', arr=[Arr('lat', 160),
    ...                                        Arr('bnds', 2)]),
    ...        'lon':
    ...        Var('lon', 'Float64', arr=[Arr('lon', 320)]),
    ...        'lon_bnds':
    ...        Var('lon_bnds', 'Float64', arr=[Arr('lon', 320),
    ...                                        Arr('bnds', 2)]),
    ...        'height':
    ...        Var('height', 'Float64'),
    ...        'time':
    ...        Var('time', 'Float64', arr=[Arr('time', 8412)]),
    ...        'time_bnds':
    ...        Var('time_bnds', 'Float64', arr=[Arr('time', 8412),
    ...                                         Arr('bnds', 2)]),
    ...        'tas':
    ...        Grid('tas',
    ...             array=Var(
    ...                 'tas',
    ...                 'Float32',
    ...                 arr=[Arr('time', 8412),
    ...                      Arr('lat', 160),
    ...                      Arr('lon', 320)]),
    ...             maps={
    ...                 'time': Var('time', 'Float64', arr=[Arr('time', 8412)]),
    ...                 'lat': Var('lat', 'Float64', arr=[Arr('lat', 160)]),
    ...                 'lon': Var('lon', 'Float64', arr=[Arr('lon', 320)])
    ...             })
    ...    })
    >>> sample1_struct == parse_dataset(sample1)
    True

    >>> from cmiputil import dds
    >>> sample2 = '''
    ... Dataset {
    ...   Int32 catalog_number;
    ...   Sequence {
    ...     String experimenter;
    ...     Int32 time;
    ...     Structure {
    ...       Float64 latitude;
    ...       Float64 longitude;
    ...     } location;
    ...     Sequence {
    ...       Float64 depth;
    ...       Float64 salinity;
    ...       Float64 oxygen;
    ...       Float64 temperature;
    ...     } cast;
    ...   } station;
    ... } data;
    ... '''
    >>> sample2_struct = Dataset(
    ...     'data', {
    ...         'catalog_number':
    ...         Var('catalog_number', 'Int32'),
    ...         'station':
    ...         Sequence(
    ...             'station', {
    ...                 'experimenter':
    ...                 Var('experimenter', 'String'),
    ...                 'time':
    ...                 Var('time', 'Int32'),
    ...                 'location':
    ...                 Structure(
    ...                     'location', {
    ...                         'latitude': Var('latitude', 'Float64'),
    ...                         'longitude': Var('longitude', 'Float64')
    ...                     }),
    ...                 'cast':
    ...                 Sequence(
    ...                     'cast', {
    ...                         'depth': Var('depth', 'Float64'),
    ...                         'salinity': Var('salinity', 'Float64'),
    ...                         'oxygen': Var('oxygen', 'Float64'),
    ...                         'temperature': Var('temperature', 'Float64')
    ...                     })
    ...             })
    ...     })
    >>> sample2_struct == parse_dataset(sample2)
    True

    >>> sample3 = '''
    ... Dataset {
    ...     Structure {
    ...         Float64 lat;
    ...         Float64 lon;
    ...     } location;
    ...     Structure {
    ...         Int32 minutes;
    ...         Int32 day;
    ...         Int32 year;
    ...     } time;
    ...     Float64 depth[500];
    ...     Float64 temperature[500];
    ... } xbt-station;
    ... '''
    >>> sample3_struct = Dataset(
    ...     'xbt-station', {
    ...         'location':
    ...         Structure('location', {
    ...             'lat': Var('lat', 'Float64'),
    ...             'lon': Var('lon', 'Float64')
    ...         }),
    ...         'time':
    ...         Structure(
    ...             'time', {
    ...                 'minutes': Var('minutes', 'Int32'),
    ...                 'day': Var('day', 'Int32'),
    ...                 'year': Var('year', 'Int32')
    ...             }),
    ...         'depth':
    ...         Var('depth', 'Float64', arr=[Arr('', 500)]),
    ...         'temperature':
    ...         Var('temperature', 'Float64', arr=[Arr('', 500)])
    ...     })
    >>> sample3_struct == parse_dataset(sample3)
    True
"""

import enum
import re
import textwrap as tw
from pprint import pprint

_debug = False


def _enable_debug():
    global _debug
    _debug = True


def _disable_debug():
    global _debug
    _debug = False


def _debug_write(text):
    global _debug
    if _debug:
        print(text)


[docs]class BType(enum.Enum): """ Values for :attr:`.Var.btype`. """ Byte = 'Byte' Int16 = 'Int16' Int32 = 'Int32' UInt32 = 'UInt32' Float32 = 'Float32' Float64 = 'Float64' String = 'String' Url = 'Url'
[docs]class SType(enum.Enum): """ Values for :attr:`Struct.stype` """ Dataset = 'Dataset' Structure = 'Structure' Sequence = 'Sequence' Grid = 'Grid'
_idents_btype = [t.name for t in BType] _idents_stype = [t.name for t in SType] _idents = _idents_btype + _idents_stype _pat_idents_stype = re.compile(r'^\s*(' + '|'.join(_idents_stype) + ')') _pat_ident = re.compile(r'^\s*(' + '|'.join(_idents) + ')') _pat_struct = re.compile( r'^\s*(' + r'|'.join(_idents_stype) + r')\s*\{(.*)\}\s*(\S+);\s*', re.DOTALL) _pat_dataset = re.compile(r'^\s*Dataset\s+' r'\{(.+)\}\s*(\S+);\s*$', re.DOTALL) _pat_grid = re.compile( r'^\s*Grid\s*\{\s*Array:(.+)Maps:' r'\s*(.+)\s*\}\s*(\w+);', re.IGNORECASE | re.DOTALL) _pat_varline = re.compile(r'^\s*(\w+)\s*(\w+)(\[.+\])*;\s*$', re.DOTALL) _pat_arrdecl = re.compile(r'\[(\w+?)\s*=\s*(\d+)\]') _pat_arrdecl_valonly = re.compile(r'^s*\[(\d+)]') _pat_arrdecl_line = re.compile(r'\[(?:\w+?\s*=)*\s*\d+\]')
[docs]class Decls(dict): """ Class for *declarations*. | *declarations* := list(*declaration*) In this module, *declarations* are expressed as `dict`, not `list`. At this point, this class is just an alias for `dict`. """ pass
[docs]class Decl: """ Class for *declaration*, that is, base class for :class:`Var` and :class:`Struct`. No need to use this class explicitly. | *declaration* := *Var* | *Struct* """ def __init__(self, name=''): self.name = name def __eq__(self, other): _debug_write(f'Decl.__eq__():{type(self)},{type(other)}') if not isinstance(other, type(self)): return False res = [getattr(self, a) == getattr(other, a) for a in self.__dict__] return all(res)
[docs] def text_formatted(self, indent=None, linebreak=True): pass
[docs]class Struct(Decl): """ Class for *struct*, that is, base class for :class:`Structure`, :class:`Sequence`, :class:`Grid` and :class:`Dataset`. Do not use this directly. | *struct* := *stype* { *declarations* } *var* | *stype* := Dataset|Structure|Sequence|Grid You can access items of ``self.decl`` as if they are the attribute of this class, via dot notation. Examples: >>> text = ''' ... Sequence { ... Float64 depth; ... Float64 salinity; ... Float64 oxygen; ... Float64 temperature; ... } cast;''' >>> s = Sequence(text=text) >>> s.salinity Var('salinity', 'Float64') >>> text = ''' ... Dataset { ... Int32 catalog_number; ... Sequence { ... String experimenter; ... Int32 time; ... Structure { ... Float64 latitude; ... Float64 longitude; ... } location; ... } station; ... } data;''' >>> d = parse_dataset(text) >>> d.station.location.latitude Var('latitude', 'Float64') Attributes: name(str): *name* stype(SType): *stype* decl(Decls)): *declarations* """ stype = None def __init__(self, name='', decl=None, text=None): """ Parameters: name(str): *name* decl(str or Decls)): *declarations* text(str): text to be parsed. If `text` is *not* ``None``, other attributes are overridden by the result of :meth:`.parse` or left untouced.. """ if text: _debug_write(f'{self.__class__.__name__}' f"text='{text}'") self.parse(text) else: self.name = name if decl is None: self.decl = None elif isinstance(decl, dict): self.decl = decl elif type(decl) is str: self.decl = parse_declarations(decl) else: raise TypeError(f'decl={decl} is invalid type: {type(decl)}')
[docs] def parse(self, text): """ Parse `text` to construct :class:`Struct`. If given `text` is not valid for each subclass, the instance is left as 'null' instance. """ _debug_write(f'{self.__class__.__name__}.parse: text="{text}"') res = _pat_struct.match(text) if not res: return None _debug_write(f'{self.__class__.__name__}.parse:name="{res.group(3)}"') _debug_write(f'{self.__class__.__name__}.parse:decl="{res.group(2)}"') if self.stype and self.stype == SType(res.group(1)): self.decl = parse_declarations(res.group(2)) self.name = res.group(3)
def __getattr__(self, key): # print('__getattr__() called') if key in self.decl: return self.decl[key] else: raise AttributeError( f"'{self.__class__.__name__}' object has no attribute '{key}'") def __getitem__(self, key): # print('__getitem__() called') if key in self.decl: return self.decl[key] else: raise KeyError(f"'{key}'") def __contains__(self, item): # print('__contains__() called') return (item in self.__dict__) or (item in self.decl) def __repr__(self): if self.name: name = f"'{self.name}'" else: name = '' if self.decl: # decl = f'decl={self.decl.__repr__()}' decl = f'{self.decl.__repr__()}' else: decl = '' res = ', '.join([l for l in [name, decl] if l]) return (f'{self.__class__.__name__}({res})') def __str__(self): return self.text_formatted()
[docs] def text_formatted(self, indent=4, linebreak=True): """ Return formatted text. """ _debug_write( f'{self.__class__.__name__}.text_formatted:indent={indent},linebreak={linebreak}' ) if self.name: name = self.name + ';' else: name = '' if self.stype: stype = f'{self.stype.name}' else: stype = '' if self.decl: if linebreak: lb = '\n' else: lb = '' decl = f'{lb}'.join([ self.decl[d].text_formatted(indent, linebreak) for d in self.decl if d ]) decl = tw.indent(decl, ' ' * indent) decl = f'{lb}'.join(('{', decl, '}')) else: decl = '' if name == '' and decl == '': res = '' else: res = ' '.join([l for l in [stype, decl, name] if l]) return res
@property def text(self): """ Text to construct this instance. """ return self.text_formatted(indent=0, linebreak=False)
[docs]class Dataset(Struct): """ Class for *Dataset*. See :class:`Struct`. """ stype = SType.Dataset def __init__(self, name='', decl=None, text=None): super().__init__(name, decl=decl) if text: super().__init__(text=text)
[docs]class Structure(Struct): """ Class for *Structure*. See :class:`Struct`. """ stype = SType.Structure def __init__(self, name='', decl=None, text=None): super().__init__(name, decl=decl) if text: super().__init__(text=text)
[docs]class Sequence(Struct): """ Class for *Sequence*. See :class:`Struct`. Examples: >>> text = ''' ... Sequence { ... Float64 depth; ... Float64 salinity; ... Float64 oxygen; ... Float64 temperature; ... } cast;''' >>> Sequence(text=text) Sequence('cast', {'depth': Var('depth', 'Float64'), 'salinity': Var('salinity', 'Float64'), 'oxygen': Var('oxygen', 'Float64'), 'temperature': Var('temperature', 'Float64')}) """ stype = SType.Sequence def __init__(self, name='', decl=None, text=None): super().__init__(name, decl=decl) if text: super().__init__(text=text)
[docs]class Grid(Struct): """ Class for *Grid*. | *Grid* := Grid { ARRAY: *declaration* MAPS: *declarations* } (*name* | *name* *arr*) Attributes: name(str): *name* stype(SType): *stype* array(Decl): ARRAY *declaration* maps(Decls): MAPS *declarations* Examples: >>> text = ''' ... Grid { ... ARRAY: ... Float32 tas[time = 8412][lat = 160][lon = 320]; ... MAPS: ... Float64 time[time = 8412]; ... Float64 lat[lat = 160]; ... Float64 lon[lon = 320]; ... } tas;''' >>> Grid(text=text) Grid('tas', array=Var('tas', 'Float32', arr=[Arr('time', 8412), Arr('lat', 160), Arr('lon', 320)]), maps={'time': Var('time', 'Float64', arr=[Arr('time', 8412)]), 'lat': Var('lat', 'Float64', arr=[Arr('lat', 160)]), 'lon': Var('lon', 'Float64', arr=[Arr('lon', 320)])}) """ stype = SType.Grid def __init__(self, name='', array=None, maps=None, text=None): """ Parameters: name(str): *name* stype(str or SType): *stype* array(Decl): ARRAY *declaration* maps(Decls): MAPS *declarations* text(str): text to be parsed. If `text` is not ``None``, other attributes are overridden by the result of :meth:`.parse`. """ super().__init__(name, decl=None) self.array = array self.maps = maps if text: self.parse(text)
[docs] def parse(self, text): """ Parse `text` to construct :class:`Grid`. """ _debug_write(f"{self.__class__.__name__}.parse: text='{text}'") res = _pat_grid.match(text) if res: _debug_write( f"{self.__class__.__name__}.parse: array_line='{res.group(1).strip()}'" ) _debug_write( f"{self.__class__.__name__}.parse: maps_line='{res.group(2).strip()}'" ) self.array = Var(text=res.group(1)) self.maps = parse_declarations(res.group(2)) self.name = res.group(3)
def __getattr__(self, key): # print('__getattr__() called') if key == self.array.name: return self.array elif key in self.maps: return self.maps[key] else: raise AttributeError( f"'{self.__class__.__name__}' object has no attribute '{key}'") def __getitem__(self, key): # print('__getitem__() called') if key == self.array.name: return self.array elif key in self.maps: return self.maps[key] else: raise KeyError(f"'{key}'") def __contains__(self, item): # print('__contains__() called') return (item in self.__dict__) or (item in self.maps) or ( item == self.array.name) def __repr__(self): if self.name: name = f"'{self.name}'" else: name = '' if self.array: array = f'array={self.array.__repr__()}' else: array = '' if self.maps: maps = f'maps={self.maps.__repr__()}' else: maps = '' res = ', '.join([l for l in [name, array, maps] if l]) return (f'{self.__class__.__name__}({res})') def __str__(self): return self.text_formatted()
[docs] def text_formatted(self, indent=4, linebreak=True): """ Return formatted text. """ _debug_write( f'{self.__class__.__name__}.text_formatted:indent={indent},linebreak={linebreak}' ) if self.name: name = self.name + ';' else: name = '' if self.stype: stype = f'{self.stype.name}' else: stype = '' if self.array is None or self.maps is None: decl = '' else: if linebreak: lb = '\n' else: lb = '' array = f' ARRAY:{lb}' + tw.indent(self.array.text, ' ' * indent) ll = f'{lb}'.join([ self.maps[d].text_formatted(indent, linebreak) for d in self.maps if d ]) maps = f' MAPS:{lb}' + tw.indent(ll, ' ' * indent) decl = f'{lb}'.join(('{', array, maps, '}')) if name == '' and decl == '': res = '' else: res = ' '.join([l for l in [stype, decl, name] if l]) return res
@property def text(self): """ Text to construct this instance. """ return self.text_formatted(indent=0, linebreak=False)
[docs]class Var(Decl): """ Class for *Var*. | *Var* := *basetype* (*name*|*name* *arr*) Attributes: name (str): *name* btype (BType): *basetype* arr (list(Arr)): *array-decl* """ def __init__(self, name='', btype=None, arr=None, text=None): """ Parameters: name(str): *name* btype(str or BType): *basetype* arr(Arr or list(Arr)): *array-decl* text(str): text to be parsed Raises: TypeError: if `btype` or `arr` is invalid If `text` is not ``None``, other attributes are overridden by the result of :meth:`.parse`. """ self.name = name if btype is None: self.btype = btype elif isinstance(btype, BType): self.btype = btype elif type(btype) is str: self.btype = BType(btype) else: raise TypeError(f'btype={btype} is invalid type: {type(btype)}') if arr is None or arr == []: self.arr = None elif isinstance(arr, Arr): self.arr = arr elif type(arr) is list and isinstance(arr[0], Arr): self.arr = arr elif isinstance(arr, str): self.arr = parse_arrdecls(arr) else: raise TypeError(f'arr={arr} is invalid type: {type(arr)}') if text: self.parse(text)
[docs] def parse(self, text): """ Parse `text` to construct :class:`Var`. """ _debug_write(f'Var.parse():text="{text[:60]}"') res = _pat_varline.match(text) if res: try: self.btype = BType(res.group(1)) except ValueError: return None self.name = res.group(2) if res.group(3): self.arr = parse_arrdecls(res.group(3))
def __repr__(self): if self.name == '': name = '' else: name = f"'{self.name}'" if self.btype is None: btype = '' else: btype = f"'{self.btype.name}'" if self.arr: arr = 'arr=' + str([a for a in self.arr]) else: arr = '' args = ', '.join([elem for elem in [name, btype, arr] if elem != '']) return f'Var({args})' def __str__(self): return self.text_formatted()
[docs] def text_formatted(self, indent=None, linebreak=None): """ Formatted text expression of this instance. `indent` and `linebreak` are dummy arguments here. """ if self.btype is None: res = '' else: res = f'{self.btype.name}' if self.name != '': res += f' {self.name}' if self.arr: res += ''.join([a.text for a in self.arr]) if res: res += ';' return res
@property def text(self): """ Text to construct this instance. """ return self.text_formatted()
[docs]class Arr(): """ Class for *arr*. | *arr* := [integer] | [*name* = integer] As a text form:: text = '[time = 8412]' text = '[500]' Example: >>> text = '[lat = 160];' >>> Arr(text=text) Arr('lat', 160) >>> text = '[500];' >>> Arr(text=text) Arr('', 500) Attributes: name (str) : *name* val (int) : integer """ def __init__(self, name='', val=None, text=None): self.name = name self.val = val if text: self.parse(text)
[docs] def parse(self, text): _debug_write(f"{self.__class__.__name__}.parse():text='{text}'") res = _pat_arrdecl.match(text) if res: self.name = res.group(1) self.val = int(res.group(2)) else: res = _pat_arrdecl_valonly.match(text) if res: self.val = int(res.group(1)) _debug_write( f"{self.__class__.__name__}.parse():name='{self.name}',val='{self.val}'" )
def __eq__(self, other): if type(other) is not type(self): return False res = [getattr(self, a) == getattr(other, a) for a in self.__dict__] return all(res) def __repr__(self): if self.name: return f"Arr('{self.name}', {self.val})" elif self.val: return f"Arr('', {self.val})" else: return '' def __str__(self): if self.name: return f"Arr(name='{self.name}', val={self.val})" elif self.val: return f"[{self.val}]" else: return ''
[docs] def text_formatted(self, indent=None, linebreak=None): """ Text form of *arr*. `indent` and `linebreak` are dummy here. """ if self.name: return f"[{self.name} = {self.val}]" elif self.val: return f"[{self.val}]" else: return ''
@property def text(self): return self.text_formatted()
[docs]def check_braces_matching(text): """ Check if braces(``{`` and ``}``) in given `text` match. Raises `ValueError` unless match. Examples: >>> text = 'Dataset{varline} hoge' >>> check_braces_matching(text) # True >>> text = 'Struct{ Sequence{Var} fuga }} hoge' >>> check_braces_matching(text) Traceback (most recent call last): ... ValueError: braces do not match: too many right braces: 1 more. >>> text = 'Struct{ Sequence{{Var} fuga } hoge' >>> check_braces_matching(text) Traceback (most recent call last): ... ValueError: braces do not match: too many left braces: 1 more. """ count = 0 maxcount = 0 _debug_write('check_braces_matching:') for n, c in enumerate(text): if c == '{': count += 1 maxcount = max(maxcount, count) _debug_write(f'n={n}, count={count}') if c == '}': count -= 1 _debug_write(f'n={n}, count={count}') if (count < 0): raise ValueError(f'braces do not match: ' f'too many right braces: {abs(count)} more.') if count > 0: raise ValueError(f'braces do not match: ' f'too many left braces: {count} more.')
[docs]def parse_dataset(text): """ Parse toplevel *dataset*. *dataset* := Dataset { *declarations* } *name*; """ check_braces_matching(text) # Dataset is the toplevel, *greedy* is preferable. res = _pat_dataset.match(text) if res: dataset = Dataset(text=text) else: raise ValueError('Given text is not the Dataset definition.') return dataset
[docs]def parse_declarations(text): """ Return :class:`Decls`, dict of {`name`: *Decl*} parsed from `text`. """ # _debug_write(f'parse_declarations:text="{text}"') # _debug_write('======parse_declarations======') res = Decls() while text != '': _debug_write('=' * 20) _debug_write(f"parse_declarations:text='{text}'") res_ident = _pat_ident.match(text) if res_ident: ident = res_ident.group(1) _debug_write(f"parse_declarations:ident:'{ident}'") if ident in _idents_stype: ss, rest = pop_struct(text) # res.append(ss) res[ss.name] = ss text = rest.strip() elif ident in _idents_btype: vl, rest = pop_varline(text) # res.append(vl) res[vl.name] = vl text = rest.strip() else: return None return res
[docs]def pop_struct(text): """ Pop one :class:`Struct`-derived instance parsed from the first part of `text`, return it and the rest of `text`. """ leftpos = text.find('{') if (leftpos < 0): # no braces, no Struct instance. return None nestlevel = 0 for n, c in enumerate(text[leftpos:]): if c == '{': nestlevel += 1 if c == '}': nestlevel -= 1 if nestlevel == 0: rightpos = leftpos + n break lastdelim = rightpos + text[rightpos:].find(';') + 1 _debug_write(f"parse_struct:lastdelim='{lastdelim}'") sline = text[:lastdelim].strip() rest = text[lastdelim:].strip() res = _pat_idents_stype.match(text) if res: ident = res.group(1) _debug_write(f"parse_struct:ident='{ident}'") _debug_write(f"parse_struct:sline='{sline}'") if ident == 'Grid': ss = Grid(text=sline) elif ident == 'Dataset': ss = Dataset(text=sline) elif ident == 'Structure': ss = Structure(text=sline) elif ident == 'Sequence': ss = Sequence(text=sline) else: raise ValueError('Invalid text') return ss, rest
[docs]def pop_varline(text): """ Pop one :class:`Var` instance parsed from the first part of `text`, return it and rest of the `text`. """ _debug_write(f"pop_varline:text='{text}'") pat_split = re.compile(r' *(.+?;) *(.*)', re.DOTALL) res = pat_split.match(text) vline = res.group(1) try: rest = res.group(2).strip() except AttributeError: rest = '' _debug_write(f"pop_varline:vline='{vline}'") _debug_write(f"pop_varline:rest='{rest}'") vl = Var(text=vline) return vl, rest
[docs]def parse_arrdecls(text): """ Parse `text` contains multiple :class:`Arr` definitions and return a list of them. """ _debug_write(f"parse_arrdecls:text='{text}'") res = _pat_arrdecl_line.findall(text) if res: return [Arr(text=l) for l in res] else: return None
# for debug use... _sample1 = ''' Dataset { Float64 lat[lat = 160]; Float64 lat_bnds[lat = 160][bnds = 2]; Float64 lon[lon = 320]; Float64 lon_bnds[lon = 320][bnds = 2]; Float64 height; Float64 time[time = 8412]; Float64 time_bnds[time = 8412][bnds = 2]; Grid { ARRAY: Float32 tas[time = 8412][lat = 160][lon = 320]; MAPS: Float64 time[time = 8412]; Float64 lat[lat = 160]; Float64 lon[lon = 320]; } tas; } CMIP6.CMIP.MRI.MRI-ESM2-0.piControl.r1i1p1f1.Amon.tas.gn.tas.20190222.aggregation.1; ''' _sample1_struct = Dataset( 'CMIP6.CMIP.MRI.MRI-ESM2-0.piControl.r1i1p1f1.Amon.tas.gn.tas.20190222.aggregation.1', { 'lat': Var('lat', 'Float64', arr=[Arr('lat', 160)]), 'lat_bnds': Var('lat_bnds', 'Float64', arr=[Arr('lat', 160), Arr('bnds', 2)]), 'lon': Var('lon', 'Float64', arr=[Arr('lon', 320)]), 'lon_bnds': Var('lon_bnds', 'Float64', arr=[Arr('lon', 320), Arr('bnds', 2)]), 'height': Var('height', 'Float64'), 'time': Var('time', 'Float64', arr=[Arr('time', 8412)]), 'time_bnds': Var('time_bnds', 'Float64', arr=[Arr('time', 8412), Arr('bnds', 2)]), 'tas': Grid('tas', array=Var( 'tas', 'Float32', arr=[Arr('time', 8412), Arr('lat', 160), Arr('lon', 320)]), maps=Decls({ 'time': Var('time', 'Float64', arr=[Arr('time', 8412)]), 'lat': Var('lat', 'Float64', arr=[Arr('lat', 160)]), 'lon': Var('lon', 'Float64', arr=[Arr('lon', 320)]) })) }) _sample2 = ''' Dataset { Int32 catalog_number; Sequence { String experimenter; Int32 time; Structure { Float64 latitude; Float64 longitude; } location; Sequence { Float64 depth; Float64 salinity; Float64 oxygen; Float64 temperature; } cast; } station; } data; ''' _sample2_struct = Dataset( 'data', { 'catalog_number': Var('catalog_number', 'Int32'), 'station': Sequence( 'station', { 'experimenter': Var('experimenter', 'String'), 'time': Var('time', 'Int32'), 'location': Structure( 'location', { 'latitude': Var('latitude', 'Float64'), 'longitude': Var('longitude', 'Float64') }), 'cast': Sequence( 'cast', { 'depth': Var('depth', 'Float64'), 'salinity': Var('salinity', 'Float64'), 'oxygen': Var('oxygen', 'Float64'), 'temperature': Var('temperature', 'Float64') }) }) }) _sample3 = '''\ Dataset { Structure { Float64 lat; Float64 lon; } location; Structure { Int32 minutes; Int32 day; Int32 year; } time; Float64 depth[500]; Float64 temperature[500]; } xbt-station; ''' _sample3_struct = Dataset( 'xbt-station', { 'location': Structure('location', { 'lat': Var('lat', 'Float64'), 'lon': Var('lon', 'Float64') }), 'time': Structure( 'time', { 'minutes': Var('minutes', 'Int32'), 'day': Var('day', 'Int32'), 'year': Var('year', 'Int32') }), 'depth': Var('depth', 'Float64', arr=[Arr('', 500)]), 'temperature': Var('temperature', 'Float64', arr=[Arr('', 500)]) }) # _enable_debug() #_disable_debug() def _test_mod(): import doctest doctest.testmod() if __name__ == '__main__': _test_mod()