跳转至

Data.dataset(数据集) 模块

ppsci.data.dataset

CGCNNDataset

Bases: Dataset

The CIFData dataset is a wrapper for a dataset where the crystal structures are stored in the form of CIF files. The dataset should have the following directory structure:

root_dir ├── id_prop.csv ├── atom_init.json ├── id0.cif ├── id1.cif ├── ...

id_prop.csv: a CSV file with two columns. The first column recodes a unique ID for each crystal, and the second column recodes the value of target property.

atom_init.json: a JSON file that stores the initialization vector for each element.

ID.cif: a CIF file that recodes the crystal structure, where ID is the unique ID for the crystal.

Args root_dir (str): The path to the root directory of the dataset max_num_nbr (int): The maximum number of neighbors while constructing the crystal graph radius (float): The cutoff radius for searching neighbors dmin (float): The minimum distance for constructing GaussianDistance step (float): The step size for constructing GaussianDistance random_seed (int): Random seed for shuffling the dataset

Returns atom_fea (paddle.Tensor): Shape (n_i, atom_fea_len) nbr_fea (paddle.Tensor): Shape (n_i, M, nbr_fea_len) nbr_fea_idx (paddle.Tensor): Shape (n_i, M) target (paddle.Tensor): Shape (1, ) cif_id (str or int)

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.CGCNNDataset(
...     "file_path": "/path/to/CGCNNDataset",
...     "input_keys": "i",
...     "label_keys": "l",
...     "id_keys": "c",
... )
Source code in ppsci/data/dataset/cgcnn_dataset.py
class CIFData(io.Dataset):
    """
    The CIFData dataset is a wrapper for a dataset where the crystal structures
    are stored in the form of CIF files. The dataset should have the following
    directory structure:

    root_dir
    ├── id_prop.csv
    ├── atom_init.json
    ├── id0.cif
    ├── id1.cif
    ├── ...

    id_prop.csv: a CSV file with two columns. The first column recodes a
    unique ID for each crystal, and the second column recodes the value of
    target property.

    atom_init.json: a JSON file that stores the initialization vector for each  element.

    ID.cif: a CIF file that recodes the crystal structure, where ID is the
    unique ID for the crystal.

    Args
        root_dir (str): The path to the root directory of the dataset
        max_num_nbr (int): The maximum number of neighbors while constructing the crystal graph
        radius (float): The cutoff radius for searching neighbors
        dmin (float): The minimum distance for constructing GaussianDistance
        step (float): The step size for constructing GaussianDistance
        random_seed (int): Random seed for shuffling the dataset


    Returns
        atom_fea (paddle.Tensor): Shape (n_i, atom_fea_len)
        nbr_fea (paddle.Tensor): Shape (n_i, M, nbr_fea_len)
        nbr_fea_idx (paddle.Tensor): Shape (n_i, M)
        target (paddle.Tensor): Shape (1, )
        cif_id (str or int)

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.CGCNNDataset(
        ...     "file_path": "/path/to/CGCNNDataset",
        ...     "input_keys": "i",
        ...     "label_keys": "l",
        ...     "id_keys": "c",
        ... )  # doctest: +SKIP
    """

    def __init__(
        self,
        root_dir: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        id_keys: Tuple[str, ...],
        max_num_nbr: int = 12,
        radius: int = 8,
        dmin: int = 0,
        step: float = 0.2,
        random_seed: int = 123,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.id_keys = id_keys
        self.root_dir = root_dir
        self.max_num_nbr, self.radius = max_num_nbr, radius
        assert os.path.exists(root_dir), "root_dir does not exist!"
        id_prop_file = os.path.join(self.root_dir, "id_prop.csv")
        assert os.path.exists(id_prop_file), "id_prop.csv does not exist!"
        with open(id_prop_file) as f:
            reader = csv.reader(f)
            self.id_prop_data = [row for row in reader]
        random.seed(random_seed)
        random.shuffle(self.id_prop_data)
        atom_init_file = os.path.join(self.root_dir, "atom_init.json")
        assert os.path.exists(atom_init_file), f"{atom_init_file} does not exist!"
        self.ari = AtomCustomJSONInitializer(atom_init_file)
        self.gdf = GaussianDistance(dmin=dmin, dmax=self.radius, step=step)
        self.raw_data = [self.get(i) for i in range(len(self))]

    def __len__(self):
        return len(self.id_prop_data)

    @functools.lru_cache(maxsize=None)  # Cache loaded structures
    def __getitem__(self, idx):
        return (
            {self.input_keys[0]: self.raw_data[idx][0]},
            {self.label_keys[0]: self.raw_data[idx][1]},
            {self.id_keys[0]: self.raw_data[idx][2]},
        )

    def get(self, idx):
        cif_id, target = self.id_prop_data[idx]
        crystal = Structure.from_file(os.path.join(self.root_dir, cif_id + ".cif"))
        atom_fea = np.vstack(
            [
                self.ari.get_atom_fea(crystal[i].specie.number)
                for i in range(len(crystal))
            ]
        )
        atom_fea = paddle.Tensor(atom_fea)
        all_nbrs = crystal.get_all_neighbors(self.radius, include_index=True)
        all_nbrs = [sorted(nbrs, key=lambda x: x[1]) for nbrs in all_nbrs]
        nbr_fea_idx, nbr_fea = [], []
        for nbr in all_nbrs:
            if len(nbr) < self.max_num_nbr:
                warnings.warn(
                    f"{cif_id} not find enough neighbors to build graph. "
                    "If it happens frequently, consider increase "
                    "radius."
                )
                nbr_fea_idx.append(
                    list(map(lambda x: x[2], nbr)) + [0] * (self.max_num_nbr - len(nbr))
                )
                nbr_fea.append(
                    list(map(lambda x: x[1], nbr))
                    + [self.radius + 1.0] * (self.max_num_nbr - len(nbr))
                )
            else:
                nbr_fea_idx.append(list(map(lambda x: x[2], nbr[: self.max_num_nbr])))
                nbr_fea.append(list(map(lambda x: x[1], nbr[: self.max_num_nbr])))
        nbr_fea_idx, nbr_fea = np.array(nbr_fea_idx), np.array(nbr_fea)
        nbr_fea = self.gdf.expand(nbr_fea)
        atom_fea = np.array(atom_fea)
        nbr_fea = np.array(nbr_fea)
        nbr_fea_idx = np.array(nbr_fea_idx, dtype="int64")
        target = np.array([float(target)], dtype="float32")
        return (atom_fea, nbr_fea, nbr_fea_idx), target, cif_id

ChipHeatDataset

Bases: Dataset

ChipHeatDataset for data loading of multi-branch DeepONet model.

Parameters:

Name Type Description Default
input Dict[str, ndarray]

Input dict.

required
label Optional[Dict[str, ndarray]]

Label dict. Defaults to None.

required
index tuple[str, ...]

Key of input dict.

required
data_type str

One of key of input dict.

required
weight Optional[Dict[str, ndarray]]

Weight dict. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> input = {"x": np.random.randn(100, 1)}
>>> label = {"u": np.random.randn(100, 1)}
>>> index = ('x', 'u', 'bc', 'bc_data')
>>> data_type = 'u'
>>> weight = {"u": np.random.randn(100, 1)}
>>> dataset = ppsci.data.dataset.ChipHeatDataset(input, label, index, data_type, weight)
Source code in ppsci/data/dataset/array_dataset.py
class ChipHeatDataset(io.Dataset):
    """ChipHeatDataset for data loading of multi-branch DeepONet model.

    Args:
        input (Dict[str, np.ndarray]): Input dict.
        label (Optional[Dict[str, np.ndarray]]): Label dict. Defaults to None.
        index (tuple[str, ...]): Key of input dict.
        data_type (str): One of key of input dict.
        weight (Optional[Dict[str, np.ndarray]]): Weight dict. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> input = {"x": np.random.randn(100, 1)}
        >>> label = {"u": np.random.randn(100, 1)}
        >>> index = ('x', 'u', 'bc', 'bc_data')
        >>> data_type = 'u'
        >>> weight = {"u": np.random.randn(100, 1)}
        >>> dataset = ppsci.data.dataset.ChipHeatDataset(input, label, index, data_type, weight)
    """

    def __init__(
        self,
        input: Dict[str, np.ndarray],
        label: Dict[str, np.ndarray],
        index: tuple[str, ...],
        data_type: str,
        weight: Optional[Dict[str, float]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input = input
        self.label = label
        self.input_keys = tuple(input.keys())
        self.label_keys = tuple(label.keys())
        self.index = index
        self.data_type = data_type
        self.weight = {} if weight is None else weight
        self.transforms = transforms

    def __getitem__(self, idx):
        quotient = idx
        index_ir = dict()
        for i in self.index:
            index_ir[i] = 0

        for i in index_ir:
            num = len(self.input[i])
            index_ir[i] = quotient % num
            quotient = quotient // num

        input_item = {}
        for key in self.input:
            if key == "y":
                input_item[key] = self.input[key][index_ir["x"]]
            elif key == "u_one":
                input_item[key] = self.input[key][
                    len(self.input[self.data_type]) * index_ir["x"]
                    + index_ir[self.data_type]
                ]
            else:
                input_item[key] = self.input[key][index_ir[key]]

        label_item = {key: value for key, value in self.label.items()}
        weight_item = {key: value for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                (input_item, label_item, weight_item)
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        _len = 1
        for i in self.index:
            _len *= len(self.input[i])
        return _len

ContinuousNamedArrayDataset

Bases: IterableDataset

ContinuousNamedArrayDataset for iterable sampling.

Parameters:

Name Type Description Default
input Callable

Function generate input dict.

required
label Callable

Function generate label dict.

required
weight Optional[Callable]

Function generate weight dict. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> import numpy as np
>>> input = lambda : {"x": np.random.randn(100, 1)}
>>> label = lambda inp: {"u": np.random.randn(100, 1)}
>>> weight = lambda inp, label: {"u": 1 - (label["u"] ** 2)}
>>> dataset = ppsci.data.dataset.ContinuousNamedArrayDataset(input, label, weight)
>>> input_batch, label_batch, weight_batch = next(iter(dataset))
>>> print(input_batch["x"].shape)
[100, 1]
>>> print(label_batch["u"].shape)
[100, 1]
>>> print(weight_batch["u"].shape)
[100, 1]
Source code in ppsci/data/dataset/array_dataset.py
class ContinuousNamedArrayDataset(io.IterableDataset):
    """ContinuousNamedArrayDataset for iterable sampling.

    Args:
        input (Callable): Function generate input dict.
        label (Callable): Function generate label dict.
        weight (Optional[Callable]): Function generate weight dict. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> import numpy as np
        >>> input = lambda : {"x": np.random.randn(100, 1)}
        >>> label = lambda inp: {"u": np.random.randn(100, 1)}
        >>> weight = lambda inp, label: {"u": 1 - (label["u"] ** 2)}
        >>> dataset = ppsci.data.dataset.ContinuousNamedArrayDataset(input, label, weight)
        >>> input_batch, label_batch, weight_batch = next(iter(dataset))
        >>> print(input_batch["x"].shape)
        [100, 1]
        >>> print(label_batch["u"].shape)
        [100, 1]
        >>> print(weight_batch["u"].shape)
        [100, 1]
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        input: Callable,
        label: Callable,
        weight: Optional[Callable] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_fn = input
        self.input_keys = tuple(self.input_fn().keys())

        self.label_fn = label
        input_ = self.input_fn()
        self.label_keys = tuple(self.label_fn(input_).keys())

        self.weight_fn = weight
        self.transforms = transforms
        self.world_size_ = dist.get_world_size()
        self.rank_ = dist.get_rank()

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        raise NotImplementedError(
            "ContinuousNamedArrayDataset has no fixed number of samples."
        )

    def __iter__(self):
        def to_tensor_dict(_dict):
            if _dict is None:
                return None
            return {k: paddle.to_tensor(v) for k, v in _dict.items()}

        while True:
            input_batch = self.input_fn()
            label_batch = self.label_fn(input_batch)
            if callable(self.weight_fn):
                weight_batch = self.weight_fn(input_batch, label_batch)
            else:
                weight_batch = None

            if callable(self.transforms):
                input_batch, label_batch, weight_batch = self.transforms(
                    input_batch, label_batch, weight_batch
                )

            if self.world_size_ > 1:
                input_batch = _group_dict_into_local_rank(
                    input_batch, self.rank_, self.world_size_
                )
                label_batch = _group_dict_into_local_rank(
                    label_batch, self.rank_, self.world_size_
                )
                weight_batch = _group_dict_into_local_rank(
                    weight_batch, self.rank_, self.world_size_
                )

            yield to_tensor_dict(input_batch), to_tensor_dict(
                label_batch
            ), to_tensor_dict(weight_batch)

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

CSVDataset

Bases: Dataset

Dataset class for .csv file.

Parameters:

Name Type Description Default
file_path str

CSV file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys.

required
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.CSVDataset(
...     "/path/to/file.csv",
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/csv_dataset.py
class CSVDataset(io.Dataset):
    """Dataset class for .csv file.

    Args:
        file_path (str): CSV file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...]): List of label keys.
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.CSVDataset(
        ...     "/path/to/file.csv",
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_csv_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = (
            {key: np.ones_like(next(iter(self.label.values()))) for key in self.label}
            if weight_dict is not None
            else {}
        )
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        weight_item = {key: value[idx] for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        return self._len

CylinderDataset

Bases: Dataset

Dataset for training Cylinder model.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("states","visc").

required
label_keys Tuple[str, ...]

Output keys, such as ("pred_states", "recover_states").

required
block_size int

Data block size.

required
stride int

Data stride.

required
ndata Optional[int]

Number of data series to use. Defaults to None.

None
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
embedding_model Optional[Arch]

Embedding model. Defaults to None.

None
embedding_batch_size int

The batch size of embedding model. Defaults to 64.

64

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.CylinderDataset(
...     "file_path": "/path/to/CylinderDataset",
...     "input_keys": ("x",),
...     "label_keys": ("v",),
...     "block_size": 32,
...     "stride": 16,
... )
Source code in ppsci/data/dataset/trphysx_dataset.py
class CylinderDataset(io.Dataset):
    """Dataset for training Cylinder model.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("states","visc").
        label_keys (Tuple[str, ...]): Output keys, such as ("pred_states", "recover_states").
        block_size (int): Data block size.
        stride (int): Data stride.
        ndata (Optional[int]): Number of data series to use. Defaults to None.
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        embedding_model (Optional[base.Arch]): Embedding model. Defaults to None.
        embedding_batch_size (int, optional): The batch size of embedding model. Defaults to 64.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.CylinderDataset(
        ...     "file_path": "/path/to/CylinderDataset",
        ...     "input_keys": ("x",),
        ...     "label_keys": ("v",),
        ...     "block_size": 32,
        ...     "stride": 16,
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        block_size: int,
        stride: int,
        ndata: Optional[int] = None,
        weight_dict: Optional[Dict[str, float]] = None,
        embedding_model: Optional[base.Arch] = None,
        embedding_batch_size: int = 64,
    ):
        if not os.path.exists(file_path):
            raise FileNotFoundError(
                f"file_path({file_path}) not exists. Please download dataset first. "
                "Training: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/cylinder_training.hdf5. "
                "Valid: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/cylinder_valid.hdf5."
            )
        super().__init__()
        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys

        self.block_size = block_size
        self.stride = stride
        self.ndata = ndata
        self.weight_dict = {key: 1.0 for key in self.label_keys}
        if weight_dict is not None:
            self.weight_dict.update(weight_dict)

        self.data, self.visc = self.read_data(file_path, block_size, stride)
        self.embedding_model = embedding_model
        if embedding_model is None:
            self.embedding_data = None
        else:
            embedding_model.eval()
            with paddle.no_grad():
                data_tensor = paddle.to_tensor(self.data)
                visc_tensor = paddle.to_tensor(self.visc)
                embedding_data = []
                for i in range(0, len(data_tensor), embedding_batch_size):
                    start, end = i, min(i + embedding_batch_size, len(data_tensor))
                    embedding_data_batch = embedding_model.encoder(
                        data_tensor[start:end], visc_tensor[start:end]
                    )
                    embedding_data.append(embedding_data_batch.numpy())
                self.embedding_data = np.concatenate(embedding_data)

    def read_data(self, file_path: str, block_size: int, stride: int):
        data = []
        visc = []
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"{file_path} does not exist")
        with h5py.File(file_path, "r") as f:
            data_num = 0
            for key in f.keys():
                visc0 = 2.0 / float(key)
                ux = np.asarray(f[key + "/ux"], dtype=paddle.get_default_dtype())
                uy = np.asarray(f[key + "/uy"], dtype=paddle.get_default_dtype())
                p = np.asarray(f[key + "/p"], dtype=paddle.get_default_dtype())
                data_series = np.stack([ux, uy, p], axis=1)

                for i in range(0, data_series.shape[0] - block_size + 1, stride):
                    data.append(data_series[i : i + block_size])
                    visc.append([visc0])

                data_num += 1
                if self.ndata is not None and data_num >= self.ndata:
                    break

        data = np.asarray(data)
        visc = np.asarray(visc, dtype=paddle.get_default_dtype())
        return data, visc

    def __len__(self):
        return len(self.data)

    def __getitem__(self, i):
        if self.embedding_data is None:
            data_item = self.data[i]
            input_item = {
                self.input_keys[0]: data_item,
                self.input_keys[1]: self.visc[i],
            }
            label_item = {
                self.label_keys[0]: data_item[1:],
                self.label_keys[1]: data_item,
            }
        else:
            data_item = self.embedding_data[i]
            input_item = {self.input_keys[0]: data_item[:-1, :]}
            label_item = {self.label_keys[0]: data_item[1:, :]}
            if len(self.label_keys) == 2:
                label_item[self.label_keys[1]] = data_item[1:, :]
        weight_shape = [1] * len(data_item.shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }
        return (input_item, label_item, weight_item)

DarcyFlowDataset

Bases: Dataset

Loads a small Darcy-Flow dataset

Training contains 1000 samples in resolution 16x16. Testing contains 100 samples at resolution 16x16 and 50 samples at resolution 32x32.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
data_dir str

The directory to load data from.

required
weight_dict Optional[Dict[str, float]]

Define the weight of each constraint variable. Defaults to None.

None
test_resolutions Tuple[int, ...]

The resolutions to test dataset. Default is [16, 32].

[32]
grid_boundaries Tuple[int, ...]

The boundaries of the grid. Default is [[0,1],[0,1]].

[[0, 1], [0, 1]]
positional_encoding bool

Whether to use positional encoding. Default is True

True
encode_input bool

Whether to encode the input. Default is False

False
encode_output bool

Whether to encode the output. Default is True

True
encoding str

The type of encoding. Default is 'channel-wise'.

'channel-wise'
channel_dim int

The location of unsqueeze. Default is 1. where to put the channel dimension. Defaults size is batch, channel, height, width

1
data_split str

Wether to use training or test dataset. Default is 'train'.

'train'
Source code in ppsci/data/dataset/darcyflow_dataset.py
class DarcyFlowDataset(io.Dataset):
    """Loads a small Darcy-Flow dataset

    Training contains 1000 samples in resolution 16x16.
    Testing contains 100 samples at resolution 16x16 and
    50 samples at resolution 32x32.

    Args:
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        data_dir (str): The directory to load data from.
        weight_dict (Optional[Dict[str, float]], optional): Define the weight of each constraint variable. Defaults to None.
        test_resolutions (Tuple[int, ...]): The resolutions to test dataset. Default is [16, 32].
        grid_boundaries (Tuple[int, ...]): The boundaries of the grid. Default is [[0,1],[0,1]].
        positional_encoding (bool): Whether to use positional encoding. Default is True
        encode_input (bool): Whether to encode the input. Default is False
        encode_output (bool): Whether to encode the output. Default is True
        encoding (str): The type of encoding. Default is 'channel-wise'.
        channel_dim (int): The location of unsqueeze. Default is 1.
            where to put the channel dimension. Defaults size is batch, channel, height, width
        data_split (str): Wether to use training or test dataset. Default is 'train'.
    """

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        weight_dict: Optional[Dict[str, float]] = None,
        test_resolutions: Tuple[int, ...] = [32],
        train_resolution: int = 32,
        grid_boundaries: Tuple[Tuple[int, ...], ...] = [[0, 1], [0, 1]],
        positional_encoding: bool = True,
        encode_input: bool = False,
        encode_output: bool = True,
        encoding: str = "channel-wise",
        channel_dim: int = 1,
        data_split: str = "train",
    ):
        super().__init__()
        for res in test_resolutions:
            if res not in [16, 32]:
                raise ValueError(
                    f"Only 32 and 64 are supported for test resolution, but got {test_resolutions}"
                )

        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.test_resolutions = test_resolutions
        self.train_resolution = train_resolution
        self.grid_boundaries = grid_boundaries
        self.positional_encoding = positional_encoding
        self.encode_input = encode_input
        self.encode_output = encode_output
        self.encoding = encoding
        self.channel_dim = channel_dim
        self.data_split = data_split

        # train path
        path_train = (
            Path(self.data_dir)
            .joinpath(f"darcy_train_{self.train_resolution}.npy")
            .as_posix()
        )
        self.x_train, self.y_train = self.read_data(path_train)
        # test path
        path_test_1 = (
            Path(self.data_dir)
            .joinpath(f"darcy_test_{self.test_resolutions[0]}.npy")
            .as_posix()
        )
        self.x_test_1, self.y_test_1 = self.read_data(path_test_1)
        path_test_2 = (
            Path(self.data_dir)
            .joinpath(f"darcy_test_{self.test_resolutions[1]}.npy")
            .as_posix()
        )
        self.x_test_2, self.y_test_2 = self.read_data(path_test_2)

        # input encoder
        if self.encode_input:
            self.input_encoder = self.encode_data(self.x_train)
            self.x_train = self.input_encoder.encode(self.x_train)
            self.x_test_1 = self.input_encoder.encode(self.x_test_1)
            self.x_test_2 = self.input_encoder.encode(self.x_test_2)
        else:
            self.input_encoder = None
        # output encoder
        if self.encode_output:
            self.output_encoder = self.encode_data(self.y_train)
            self.y_train = self.output_encoder.encode(self.y_train)
        else:
            self.output_encoder = None

        if positional_encoding:
            self.transform_x = PositionalEmbedding2D(grid_boundaries)

    def read_data(self, path):
        # load with numpy
        data = np.load(path, allow_pickle=True).item()
        x = (
            paddle.to_tensor(data["x"])
            .unsqueeze(self.channel_dim)
            .astype("float32")
            .clone()
        )
        y = paddle.to_tensor(data["y"]).unsqueeze(self.channel_dim).clone()
        del data
        return x, y

    def encode_data(self, data):
        if self.encoding == "channel-wise":
            reduce_dims = list(range(data.ndim))
        elif self.encoding == "pixel-wise":
            reduce_dims = [0]
        input_encoder = UnitGaussianNormalizer(data, reduce_dim=reduce_dims)
        return input_encoder

    def __len__(self):
        if self.data_split == "train":
            return self.x_train.shape[0]
        elif self.data_split == "test_16x16":
            return self.x_test_1.shape[0]
        else:
            return self.x_test_2.shape[0]

    def __getitem__(self, index):
        if self.data_split == "train":
            x = self.x_train[index]
            y = self.y_train[index]

        elif self.data_split == "test_16x16":
            x = self.x_test_1[index]
            y = self.y_test_1[index]
        else:
            x = self.x_test_2[index]
            y = self.y_test_2[index]

        if self.transform_x is not None:
            x = self.transform_x(x)

        input_item = {self.input_keys[0]: x}
        label_item = {self.label_keys[0]: y}
        weight_item = self.weight_dict

        return input_item, label_item, weight_item

DGMRDataset

Bases: Dataset

Dataset class for DGMR (Deep Generative Model for Radar) model. This open-sourced UK dataset has been mirrored to HuggingFace Datasets https://huggingface.co/datasets/openclimatefix/nimrod-uk-1km. If the reader cannot load the dataset from Hugging Face, please manually download it and modify the dataset_path to the local path for loading.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
split str

The split of the dataset, "validation" or "train". Defaults to "validation".

'validation'
num_input_frames int

Number of input frames. Defaults to 4.

4
num_target_frames int

Number of target frames. Defaults to 18.

18
dataset_path str

Path to the dataset. Defaults to "openclimatefix/nimrod-uk-1km".

'openclimatefix/nimrod-uk-1km'

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.DGMRDataset(("input", ), ("output", ))
Source code in ppsci/data/dataset/dgmr_dataset.py
class DGMRDataset(io.Dataset):
    """
    Dataset class for DGMR (Deep Generative Model for Radar) model.
    This open-sourced UK dataset has been mirrored to HuggingFace Datasets https://huggingface.co/datasets/openclimatefix/nimrod-uk-1km.
    If the reader cannot load the dataset from Hugging Face, please manually download it and modify the dataset_path to the local path for loading.

    Args:
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        split (str, optional): The split of the dataset, "validation" or "train". Defaults to "validation".
        num_input_frames (int, optional): Number of input frames. Defaults to 4.
        num_target_frames (int, optional): Number of target frames. Defaults to 18.
        dataset_path (str, optional): Path to the dataset. Defaults to "openclimatefix/nimrod-uk-1km".

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.DGMRDataset(("input", ), ("output", )) # doctest: +SKIP
    """

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        split: str = "validation",
        num_input_frames: int = 4,
        num_target_frames: int = 18,
        dataset_path: str = "openclimatefix/nimrod-uk-1km",
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.num_input_frames = num_input_frames
        self.num_target_frames = num_target_frames
        if not importlib.util.find_spec("datasets"):
            raise ModuleNotFoundError(
                "Please install datasets with `pip install datasets`"
                " before exporting onnx model."
            )
        import datasets

        self.reader = datasets.load_dataset(
            dataset_path, "sample", split=split, streaming=True, trust_remote_code=True
        )
        self.iter_reader = self.reader

    def __len__(self):
        return 1000

    def __getitem__(self, idx):
        try:
            row = next(self.iter_reader)
        except Exception:
            rng = default_rng(42)
            self.iter_reader = iter(
                self.reader.shuffle(
                    seed=rng.integers(low=0, high=100000), buffer_size=1000
                )
            )
            row = next(self.iter_reader)
        radar_frames = row["radar_frames"]
        input_frames = radar_frames[
            -self.num_target_frames - self.num_input_frames : -self.num_target_frames
        ]
        target_frames = radar_frames[-self.num_target_frames :]
        input_item = {
            self.input_keys[0]: np.moveaxis(input_frames, [0, 1, 2, 3], [0, 2, 3, 1])
        }
        label_item = {
            self.label_keys[0]: np.moveaxis(target_frames, [0, 1, 2, 3], [0, 2, 3, 1])
        }
        return input_item, label_item

DrivAerNetDataset

Bases: Dataset

Paddle Dataset class for the DrivAerNet dataset, handling loading, transforming, and augmenting 3D car models.

This dataset is specifically designed for aerodynamic tasks, including training machine learning models to predict aerodynamic coefficients such as drag coefficient (Cd) from 3D car models.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Tuple specifying the keys for input features. These keys correspond to the attributes of the dataset used as input to the model. For example, "vertices" represents the 3D point cloud vertices of car models.

required
label_keys Tuple[str, ...]

Tuple specifying the keys for ground-truth labels. These keys correspond to the target values, such as aerodynamic coefficients like Cd. Example: ("cd_value",)

required
weight_keys Tuple[str, ...]

Tuple specifying the keys for optional sample weights. These keys represent weighting factors that may be used to adjust loss computation during model training. Useful for handling sample imbalance. Example: ("weight_keys",)

required
subset_dir str

Path to the directory containing subset information. This directory typically contains files that divide the dataset into training, validation, and test subsets using a list of model IDs.

required
ids_file str

Path to the text file containing model IDs for the current subset. Each line in the file corresponds to a unique model ID that defines which models belong to the subset (e.g., training set or test set).

required
root_dir str

Directory containing the STL files of 3D car models. Each STL file is expected to represent a single car model and is named according to the corresponding model ID. This is the primary data source.

required
csv_file str

Path to the CSV file containing metadata for car models. This file typically includes aerodynamic properties (e.g., drag coefficient) and other descriptive attributes mapped to each model ID.

required
num_points int

Fixed number of points to sample from each 3D model. If a 3D model has more points than num_points, it will be randomly subsampled. If it has fewer points, it will be zero-padded to reach the desired number.

required
transform Optional[Callable]

An optional callable for applying data transformations. This can include augmentations such as scaling, rotation, jittering, or other preprocessing steps applied to the 3D point clouds before they are passed to the model.

None
pointcloud_exist bool

Whether the point clouds are pre-processed and saved as .pt files. If True, the dataset will directly load the pre-saved point clouds instead of generating them from STL files.

True
train_fractions float

Fraction of the training data to use. Useful for experiments where only a portion of the data is needed.

1.0
mode str

Mode of operation, either "train", "eval", or "test". Determines how the dataset behaves.

'eval'

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.DrivAerNetDataset(
...     input_keys=("vertices",),
...     label_keys=("cd_value",),
...     weight_keys=("weight_keys",),
...     subset_dir="/path/to/subset_dir",
...     ids_file="train_ids.txt",
...     root_dir="/path/to/DrivAerNetDataset",
...     csv_file="/path/to/aero_metadata.csv",
...     num_points=1024,
...     transform=None,
... )
Source code in ppsci/data/dataset/drivaernet_dataset.py
class DrivAerNetDataset(paddle.io.Dataset):
    """
    Paddle Dataset class for the DrivAerNet dataset, handling loading, transforming, and augmenting 3D car models.

    This dataset is specifically designed for aerodynamic tasks, including training machine learning models
    to predict aerodynamic coefficients such as drag coefficient (Cd) from 3D car models.

    Args:
        input_keys (Tuple[str, ...]): Tuple specifying the keys for input features.
            These keys correspond to the attributes of the dataset used as input to the model.
            For example, "vertices" represents the 3D point cloud vertices of car models.
        label_keys (Tuple[str, ...]): Tuple specifying the keys for ground-truth labels.
            These keys correspond to the target values, such as aerodynamic coefficients like Cd.
            Example: ("cd_value",)
        weight_keys (Tuple[str, ...]): Tuple specifying the keys for optional sample weights.
            These keys represent weighting factors that may be used to adjust loss computation
            during model training. Useful for handling sample imbalance.
            Example: ("weight_keys",)
        subset_dir (str): Path to the directory containing subset information.
            This directory typically contains files that divide the dataset into training,
            validation, and test subsets using a list of model IDs.
        ids_file (str): Path to the text file containing model IDs for the current subset.
            Each line in the file corresponds to a unique model ID that defines which
            models belong to the subset (e.g., training set or test set).
        root_dir (str): Directory containing the STL files of 3D car models.
            Each STL file is expected to represent a single car model and is named according
            to the corresponding model ID. This is the primary data source.
        csv_file (str): Path to the CSV file containing metadata for car models.
            This file typically includes aerodynamic properties (e.g., drag coefficient)
            and other descriptive attributes mapped to each model ID.
        num_points (int): Fixed number of points to sample from each 3D model.
            If a 3D model has more points than `num_points`, it will be randomly subsampled.
            If it has fewer points, it will be zero-padded to reach the desired number.
        transform (Optional[Callable]): An optional callable for applying data transformations.
            This can include augmentations such as scaling, rotation, jittering, or other preprocessing
            steps applied to the 3D point clouds before they are passed to the model.
        pointcloud_exist (bool): Whether the point clouds are pre-processed and saved as `.pt` files.
            If `True`, the dataset will directly load the pre-saved point clouds instead of generating them from STL files.
        train_fractions (float): Fraction of the training data to use. Useful for experiments where only a portion of the data is needed.
        mode (str): Mode of operation, either "train", "eval", or "test". Determines how the dataset behaves.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.DrivAerNetDataset(
        ...     input_keys=("vertices",),
        ...     label_keys=("cd_value",),
        ...     weight_keys=("weight_keys",),
        ...     subset_dir="/path/to/subset_dir",
        ...     ids_file="train_ids.txt",
        ...     root_dir="/path/to/DrivAerNetDataset",
        ...     csv_file="/path/to/aero_metadata.csv",
        ...     num_points=1024,
        ...     transform=None,
        ... )  # doctest: +SKIP
    """

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        weight_keys: Tuple[str, ...],
        subset_dir: str,
        ids_file: str,
        root_dir: str,
        csv_file: str,
        num_points: int,
        transform: Optional[Callable] = None,
        pointcloud_exist: bool = True,
        train_fractions=1.0,
        mode="eval",
    ):

        super().__init__()
        self.root_dir = root_dir
        try:
            self.data_frame = pd.read_csv(csv_file)
        except Exception as e:
            logging.error(f"Failed to load CSV file: {csv_file}. Error: {e}")
            raise
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.weight_keys = weight_keys
        self.subset_dir = subset_dir
        self.ids_file = ids_file
        self.transform = transform
        self.num_points = num_points
        self.pointcloud_exist = pointcloud_exist
        self.mode = mode
        self.train_fractions = train_fractions
        self.augmentation = DataAugmentation()
        self.cache = {}

        try:
            with open(os.path.join(self.subset_dir, self.ids_file), "r") as file:
                subset_ids = file.read().split()
        except FileNotFoundError as e:
            raise FileNotFoundError(f"Error loading subset file {self.ids_file}: {e}")

        self.subset_indices = self.data_frame[
            self.data_frame["Design"].isin(subset_ids)
        ].index.tolist()
        self.data_frame = self.data_frame.loc[self.subset_indices].reset_index(
            drop=True
        )

        if self.mode == "train":
            self.data_frame = self.data_frame.sample(frac=self.train_fractions)
        else:
            self.data_frame = self.data_frame

    def __len__(self) -> int:
        """Returns the total number of samples in the dataset."""
        return len(self.data_frame)

    def _sample_or_pad_vertices(
        self, vertices: paddle.Tensor, num_points: int
    ) -> paddle.Tensor:
        """
        Subsamples or pads the vertices of the model to a fixed number of points.

        Args:
            vertices: The vertices of the 3D model as a paddle.Tensor.
            num_points: The desired number of points for the model.

        Returns:
            The vertices standardized to the specified number of points.
        """
        num_vertices = vertices.shape[0]
        if num_vertices > num_points:
            indices = np.random.choice(num_vertices, num_points, replace=False)
            vertices = vertices[indices]
        elif num_vertices < num_points:
            padding = paddle.zeros(
                shape=(num_points - num_vertices, 3), dtype="float32"
            )
            vertices = paddle.concat(x=(vertices, padding), axis=0)
        return vertices

    def _load_point_cloud(self, design_id: str) -> Optional[paddle.Tensor]:
        load_path = os.path.join(self.root_dir, f"{design_id}.paddle_tensor")
        if os.path.exists(load_path) and os.path.getsize(load_path) > 0:
            try:
                vertices: paddle.Tensor = paddle.load(path=str(load_path))
                num_vertices = vertices.shape[0]

                if num_vertices > self.num_points:
                    indices = np.random.choice(
                        num_vertices, self.num_points, replace=False
                    )
                    vertices = vertices.numpy()[indices]
                    vertices = paddle.to_tensor(vertices)

                vertices = self._sample_or_pad_vertices(vertices, self.num_points)

                return vertices
            except (EOFError, RuntimeError, ValueError) as e:
                raise Exception(
                    f"Error loading point cloud from {load_path}: {e}"
                ) from e

    def __getitem__(
        self, idx: int, apply_augmentations: bool = True
    ) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray],]:
        """
        Retrieves a sample and its corresponding label from the dataset, with an option to apply augmentations.

        Args:
            idx (int): Index of the sample to retrieve.
            apply_augmentations (bool, optional): Whether to apply data augmentations. Defaults to True.

        Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]:
            A tuple containing three dictionaries:
                - The first dictionary contains the input data (point cloud) under the key specified by `self.input_keys[0]`.
                - The second dictionary contains the label (Cd value) under the key specified by `self.label_keys[0]`.
                - The third dictionary contains the weight (default is 1) under the key specified by `self.weight_keys[0]`.
        """
        if paddle.is_tensor(x=idx):
            idx = idx.tolist()

        if idx in self.cache:
            return self.cache[idx]

        row = self.data_frame.iloc[idx]
        design_id = row["Design"]
        cd_value = row["Average Cd"].reshape([-1])
        if self.pointcloud_exist:
            try:
                vertices = self._load_point_cloud(design_id)
                if vertices is None:
                    raise ValueError(
                        f"Point cloud for design {design_id} is not found or corrupted."
                    )
            except Exception as e:
                raise ValueError(
                    f"Failed to load point cloud for design {design_id}: {e}"
                )
        if apply_augmentations:
            vertices = self.augmentation.translate_pointcloud(vertices.numpy())
            vertices = self.augmentation.jitter_pointcloud(vertices)
        if self.transform:
            vertices = self.transform(vertices)

        self.cache[idx] = (
            {self.input_keys[0]: vertices},
            {self.label_keys[0]: cd_value},
            {self.weight_keys[0]: np.array(1, dtype=np.float32)},
        )

        return (
            {self.input_keys[0]: vertices},
            {self.label_keys[0]: cd_value},
            {self.weight_keys[0]: np.array(1, dtype=np.float32)},
        )

__getitem__(idx, apply_augmentations=True)

Retrieves a sample and its corresponding label from the dataset, with an option to apply augmentations.

Parameters:

Name Type Description Default
idx int

Index of the sample to retrieve.

required
apply_augmentations bool

Whether to apply data augmentations. Defaults to True.

True

Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]: A tuple containing three dictionaries: - The first dictionary contains the input data (point cloud) under the key specified by self.input_keys[0]. - The second dictionary contains the label (Cd value) under the key specified by self.label_keys[0]. - The third dictionary contains the weight (default is 1) under the key specified by self.weight_keys[0].

Source code in ppsci/data/dataset/drivaernet_dataset.py
def __getitem__(
    self, idx: int, apply_augmentations: bool = True
) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray],]:
    """
    Retrieves a sample and its corresponding label from the dataset, with an option to apply augmentations.

    Args:
        idx (int): Index of the sample to retrieve.
        apply_augmentations (bool, optional): Whether to apply data augmentations. Defaults to True.

    Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]:
        A tuple containing three dictionaries:
            - The first dictionary contains the input data (point cloud) under the key specified by `self.input_keys[0]`.
            - The second dictionary contains the label (Cd value) under the key specified by `self.label_keys[0]`.
            - The third dictionary contains the weight (default is 1) under the key specified by `self.weight_keys[0]`.
    """
    if paddle.is_tensor(x=idx):
        idx = idx.tolist()

    if idx in self.cache:
        return self.cache[idx]

    row = self.data_frame.iloc[idx]
    design_id = row["Design"]
    cd_value = row["Average Cd"].reshape([-1])
    if self.pointcloud_exist:
        try:
            vertices = self._load_point_cloud(design_id)
            if vertices is None:
                raise ValueError(
                    f"Point cloud for design {design_id} is not found or corrupted."
                )
        except Exception as e:
            raise ValueError(
                f"Failed to load point cloud for design {design_id}: {e}"
            )
    if apply_augmentations:
        vertices = self.augmentation.translate_pointcloud(vertices.numpy())
        vertices = self.augmentation.jitter_pointcloud(vertices)
    if self.transform:
        vertices = self.transform(vertices)

    self.cache[idx] = (
        {self.input_keys[0]: vertices},
        {self.label_keys[0]: cd_value},
        {self.weight_keys[0]: np.array(1, dtype=np.float32)},
    )

    return (
        {self.input_keys[0]: vertices},
        {self.label_keys[0]: cd_value},
        {self.weight_keys[0]: np.array(1, dtype=np.float32)},
    )

__len__()

Returns the total number of samples in the dataset.

Source code in ppsci/data/dataset/drivaernet_dataset.py
def __len__(self) -> int:
    """Returns the total number of samples in the dataset."""
    return len(self.data_frame)

DrivAerNetPlusPlusDataset

Bases: Dataset

Paddle Dataset class for the DrivAerNet dataset, handling loading, transforming, and augmenting 3D car models.

This dataset is designed for tasks involving aerodynamic simulations and deep learning models, specifically for predicting aerodynamic coefficients (e.g., Cd values) from 3D car models.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Tuple of strings specifying the input keys. These keys correspond to the features extracted from the dataset, typically the 3D vertices of car models. Example: ("vertices",)

required
label_keys Tuple[str, ...]

Tuple of strings specifying the label keys. These keys correspond to the ground-truth labels, such as aerodynamic coefficients (e.g., Cd values). Example: ("cd_value",)

required
weight_keys Tuple[str, ...]

Tuple of strings specifying the weight keys. These keys represent optional weighting factors used during model training to handle class imbalance or sample importance. Example: ("weight_keys",)

required
subset_dir str

Path to the directory containing subsets of the dataset. This directory is used to divide the dataset into different subsets (e.g., train, validation, test) based on provided IDs.

required
ids_file str

Path to the file containing the list of IDs for the subset. The file specifies which models belong to the current subset (e.g., training IDs).

required
root_dir str

Root directory containing the 3D STL files of car models. Each 3D model is expected to be stored in a file named according to its ID.

required
csv_file str

Path to the CSV file containing metadata for the car models. The CSV file includes information such as aerodynamic coefficients, and may also map model IDs to specific attributes.

required
num_points int

Number of points to sample or pad each 3D point cloud to. If the model has more points than num_points, it will be subsampled. If it has fewer points, zero-padding will be applied.

required
transform Optional[Callable]

Optional transformation function applied to each sample. This can include augmentations like scaling, rotation, or jittering.

None
pointcloud_exist bool

Whether the point clouds are pre-processed and saved as .pt files. If True, the dataset will directly load the pre-saved point clouds instead of generating them from STL files.

True

Examples:

import ppsci dataset = ppsci.data.dataset.DrivAerNetPlusPlusDataset( ... input_keys=("vertices",), ... label_keys=("cd_value",), ... weight_keys=("weight_keys",), ... subset_dir="/path/to/subset_dir", ... ids_file="train_ids.txt", ... root_dir="/path/to/DrivAerNetPlusPlusDataset", ... csv_file="/path/to/aero_metadata.csv", ... num_points=1024, ... transform=None, ... ) # doctest: +SKIP

Source code in ppsci/data/dataset/drivaernetplusplus_dataset.py
class DrivAerNetPlusPlusDataset(paddle.io.Dataset):
    """
    Paddle Dataset class for the DrivAerNet dataset, handling loading, transforming, and augmenting 3D car models.

    This dataset is designed for tasks involving aerodynamic simulations and deep learning models,
    specifically for predicting aerodynamic coefficients (e.g., Cd values) from 3D car models.

    Args:
        input_keys (Tuple[str, ...]): Tuple of strings specifying the input keys.
            These keys correspond to the features extracted from the dataset,
            typically the 3D vertices of car models.
            Example: ("vertices",)
        label_keys (Tuple[str, ...]): Tuple of strings specifying the label keys.
            These keys correspond to the ground-truth labels, such as aerodynamic
            coefficients (e.g., Cd values).
            Example: ("cd_value",)
        weight_keys (Tuple[str, ...]): Tuple of strings specifying the weight keys.
            These keys represent optional weighting factors used during model training
            to handle class imbalance or sample importance.
            Example: ("weight_keys",)
        subset_dir (str): Path to the directory containing subsets of the dataset.
            This directory is used to divide the dataset into different subsets
            (e.g., train, validation, test) based on provided IDs.
        ids_file (str): Path to the file containing the list of IDs for the subset.
            The file specifies which models belong to the current subset (e.g., training IDs).
        root_dir (str): Root directory containing the 3D STL files of car models.
            Each 3D model is expected to be stored in a file named according to its ID.
        csv_file (str): Path to the CSV file containing metadata for the car models.
            The CSV file includes information such as aerodynamic coefficients,
            and may also map model IDs to specific attributes.
        num_points (int): Number of points to sample or pad each 3D point cloud to.
            If the model has more points than `num_points`, it will be subsampled.
            If it has fewer points, zero-padding will be applied.
        transform (Optional[Callable]): Optional transformation function applied to each sample.
            This can include augmentations like scaling, rotation, or jittering.
        pointcloud_exist (bool): Whether the point clouds are pre-processed and saved as `.pt` files.
            If `True`, the dataset will directly load the pre-saved point clouds
            instead of generating them from STL files.

    Examples:
    >>> import ppsci
    >>> dataset = ppsci.data.dataset.DrivAerNetPlusPlusDataset(
    ...     input_keys=("vertices",),
    ...     label_keys=("cd_value",),
    ...     weight_keys=("weight_keys",),
    ...     subset_dir="/path/to/subset_dir",
    ...     ids_file="train_ids.txt",
    ...     root_dir="/path/to/DrivAerNetPlusPlusDataset",
    ...     csv_file="/path/to/aero_metadata.csv",
    ...     num_points=1024,
    ...     transform=None,
    ... )  # doctest: +SKIP
    """

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        weight_keys: Tuple[str, ...],
        subset_dir: str,
        ids_file: str,
        root_dir: str,
        csv_file: str,
        num_points: int,
        transform: Optional[Callable] = None,
        pointcloud_exist: bool = True,
    ):
        super().__init__()
        self.root_dir = root_dir
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.weight_keys = weight_keys
        self.subset_dir = subset_dir
        self.ids_file = ids_file
        self.augmentation = DataAugmentation()
        self.cache = {}

        try:
            self.data_frame = pd.read_csv(csv_file)
        except Exception as e:
            logging.error(f"Failed to load CSV file: {csv_file}. Error: {e}")
            raise
        self.transform = transform
        self.num_points = num_points
        self.pointcloud_exist = pointcloud_exist

        try:
            with open(os.path.join(self.subset_dir, self.ids_file), "r") as file:
                subset_ids = file.read().split()
        except FileNotFoundError as e:
            raise FileNotFoundError(f"Error loading subset file {self.ids_file}: {e}")

        self.subset_indices = self.data_frame[
            self.data_frame["Design"].isin(subset_ids)
        ].index.tolist()
        self.data_frame = self.data_frame.loc[self.subset_indices].reset_index(
            drop=True
        )

    def __len__(self) -> int:
        """Returns the total number of samples in the dataset."""
        return len(self.data_frame)

    def min_max_normalize(self, data: np.ndarray) -> np.ndarray:
        """
        Normalizes the data to the range [0, 1] based on min and max values.
        """
        min_vals = data.min(axis=0, keepdim=True)
        max_vals = data.max(axis=0, keepdim=True)
        normalized_data = (data - min_vals) / (max_vals - min_vals)
        return normalized_data

    def _sample_or_pad_vertices(
        self, vertices: paddle.Tensor, num_points: int
    ) -> paddle.Tensor:
        """
        Subsamples or pads the vertices of the model to a fixed number of points.

        Args:
            vertices: The vertices of the 3D model as a paddle.Tensor.
            num_points: The desired number of points for the model.

        Returns:
            The vertices standardized to the specified number of points.
        """
        num_vertices = vertices.shape[0]
        if num_vertices > num_points:
            indices = np.random.choice(num_vertices, num_points, replace=False)
            vertices = vertices[indices]
        elif num_vertices < num_points:
            padding = paddle.zeros(
                shape=(num_points - num_vertices, 3), dtype="float32"
            )
            vertices = paddle.concat(x=(vertices, padding), axis=0)
        return vertices

    def _load_point_cloud(self, design_id: str):
        load_path = os.path.join(self.root_dir, f"{design_id}.paddle_tensor")
        if os.path.exists(load_path) and os.path.getsize(load_path) > 0:
            try:
                vertices: paddle.Tensor = paddle.load(path=str(load_path))
            except (EOFError, RuntimeError, ValueError) as e:
                raise Exception(
                    f"Error loading point cloud from {load_path}: {e}"
                ) from e
            num_vertices = vertices.shape[0]

            if num_vertices > self.num_points:
                indices = np.random.choice(num_vertices, self.num_points, replace=False)
                vertices = vertices.numpy()[indices]
                vertices = paddle.to_tensor(vertices)

            vertices = self._sample_or_pad_vertices(vertices, self.num_points)

            return vertices

    def __getitem__(
        self, idx: int, apply_augmentations: bool = True
    ) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]:
        """
        Retrieves a sample and its corresponding label from the dataset, with an option to apply augmentations.

        Args:
            idx (int): Index of the sample to retrieve.
            apply_augmentations (bool, optional): Whether to apply data augmentations. Defaults to True.

        Returns:
            Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]:
                A tuple containing three dictionaries:
                    - The first dictionary contains the input data (point cloud) under the key specified by `self.input_keys[0]`.
                    - The second dictionary contains the label (Cd value) under the key specified by `self.label_keys[0]`.
                    - The third dictionary contains the weight (default is 1) under the key specified by `self.weight_keys[0]`.
        """
        if paddle.is_tensor(idx):
            idx = idx.tolist()

        if idx in self.cache:
            return self.cache[idx]

        row = self.data_frame.iloc[idx]
        design_id = row["Design"]
        cd_value = row["Average Cd"]
        if self.pointcloud_exist:
            try:
                vertices = self._load_point_cloud(design_id)
                if vertices is None:
                    raise ValueError(
                        f"Point cloud for design {design_id} is not found or corrupted."
                    )
            except Exception as e:
                raise ValueError(
                    f"Failed to load point cloud for design {design_id}: {e}"
                )

        if apply_augmentations:
            vertices = self.augmentation.translate_pointcloud(vertices.numpy())
            vertices = self.augmentation.jitter_pointcloud(vertices)

        if self.transform:
            vertices = self.transform(vertices)

        vertices = self.min_max_normalize(vertices)

        cd_value = np.array(float(cd_value), dtype=np.float32).reshape([-1])

        self.cache[idx] = (
            {self.input_keys[0]: vertices},
            {self.label_keys[0]: cd_value},
            {self.weight_keys[0]: np.array(1, dtype=np.float32)},
        )

        return (
            {self.input_keys[0]: vertices},
            {self.label_keys[0]: cd_value},
            {self.weight_keys[0]: np.array(1, dtype=np.float32)},
        )

__getitem__(idx, apply_augmentations=True)

Retrieves a sample and its corresponding label from the dataset, with an option to apply augmentations.

Parameters:

Name Type Description Default
idx int

Index of the sample to retrieve.

required
apply_augmentations bool

Whether to apply data augmentations. Defaults to True.

True

Returns:

Type Description
Tuple[Dict[str, ndarray], Dict[str, ndarray], Dict[str, ndarray]]

Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]: A tuple containing three dictionaries: - The first dictionary contains the input data (point cloud) under the key specified by self.input_keys[0]. - The second dictionary contains the label (Cd value) under the key specified by self.label_keys[0]. - The third dictionary contains the weight (default is 1) under the key specified by self.weight_keys[0].

Source code in ppsci/data/dataset/drivaernetplusplus_dataset.py
def __getitem__(
    self, idx: int, apply_augmentations: bool = True
) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]:
    """
    Retrieves a sample and its corresponding label from the dataset, with an option to apply augmentations.

    Args:
        idx (int): Index of the sample to retrieve.
        apply_augmentations (bool, optional): Whether to apply data augmentations. Defaults to True.

    Returns:
        Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]:
            A tuple containing three dictionaries:
                - The first dictionary contains the input data (point cloud) under the key specified by `self.input_keys[0]`.
                - The second dictionary contains the label (Cd value) under the key specified by `self.label_keys[0]`.
                - The third dictionary contains the weight (default is 1) under the key specified by `self.weight_keys[0]`.
    """
    if paddle.is_tensor(idx):
        idx = idx.tolist()

    if idx in self.cache:
        return self.cache[idx]

    row = self.data_frame.iloc[idx]
    design_id = row["Design"]
    cd_value = row["Average Cd"]
    if self.pointcloud_exist:
        try:
            vertices = self._load_point_cloud(design_id)
            if vertices is None:
                raise ValueError(
                    f"Point cloud for design {design_id} is not found or corrupted."
                )
        except Exception as e:
            raise ValueError(
                f"Failed to load point cloud for design {design_id}: {e}"
            )

    if apply_augmentations:
        vertices = self.augmentation.translate_pointcloud(vertices.numpy())
        vertices = self.augmentation.jitter_pointcloud(vertices)

    if self.transform:
        vertices = self.transform(vertices)

    vertices = self.min_max_normalize(vertices)

    cd_value = np.array(float(cd_value), dtype=np.float32).reshape([-1])

    self.cache[idx] = (
        {self.input_keys[0]: vertices},
        {self.label_keys[0]: cd_value},
        {self.weight_keys[0]: np.array(1, dtype=np.float32)},
    )

    return (
        {self.input_keys[0]: vertices},
        {self.label_keys[0]: cd_value},
        {self.weight_keys[0]: np.array(1, dtype=np.float32)},
    )

__len__()

Returns the total number of samples in the dataset.

Source code in ppsci/data/dataset/drivaernetplusplus_dataset.py
def __len__(self) -> int:
    """Returns the total number of samples in the dataset."""
    return len(self.data_frame)

min_max_normalize(data)

Normalizes the data to the range [0, 1] based on min and max values.

Source code in ppsci/data/dataset/drivaernetplusplus_dataset.py
def min_max_normalize(self, data: np.ndarray) -> np.ndarray:
    """
    Normalizes the data to the range [0, 1] based on min and max values.
    """
    min_vals = data.min(axis=0, keepdim=True)
    max_vals = data.max(axis=0, keepdim=True)
    normalized_data = (data - min_vals) / (max_vals - min_vals)
    return normalized_data

ERA5ClimateDataset

Bases: Dataset

ERA5 dataset for multi-meteorological-element climate prediction (r, t, u, v).

Parameters:

Name Type Description Default
file_path str

Dataset path (contains .npy files in year folders).

required
input_keys Tuple[str, ...]

Input dict keys, e.g. ("input",).

required
label_keys Tuple[str, ...]

Label dict keys, e.g. ("output",).

required
size Tuple[int, int]

Crop size (height, width).

required
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
transforms Optional[Compose]

Optional transforms. Defaults to None.

None
training bool

If in training mode (2016-2018). Else validation mode (2019).

True
stride int

Stride for sampling. Defaults to 1.

1
sq_length int

Sequence length for input and output. Defaults to 6.

6
years Optional[List[str]]

List of years to load. Defaults to None (use default years).

None
Source code in ppsci/data/dataset/era5climate_dataset.py
class ERA5ClimateDataset(io.Dataset):
    """ERA5 dataset for multi-meteorological-element climate prediction (r, t, u, v).

    Args:
        file_path (str): Dataset path (contains .npy files in year folders).
        input_keys (Tuple[str, ...]): Input dict keys, e.g. ("input",).
        label_keys (Tuple[str, ...]): Label dict keys, e.g. ("output",).
        size (Tuple[int, int]): Crop size (height, width).
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        transforms (Optional[vision.Compose]): Optional transforms. Defaults to None.
        training (bool): If in training mode (2016-2018). Else validation mode (2019).
        stride (int): Stride for sampling. Defaults to 1.
        sq_length (int): Sequence length for input and output. Defaults to 6.
        years (Optional[List[str]]): List of years to load. Defaults to None (use default years).
    """

    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        size: Tuple[int, ...],
        weight_dict: Optional[Dict[str, float]] = None,
        transforms: Optional[vision.Compose] = None,
        training: bool = True,
        stride: int = 1,
        sq_length: int = 6,
        years: Optional[List[str]] = None,
    ):
        super().__init__()
        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.size = size
        self.training = training
        self.sq_length = sq_length
        self.transforms = transforms
        self.stride = stride
        self.group_size = 24 * 7  # 168 hours per week

        mean_file_path = os.path.join(self.file_path, "mean.nc")
        std_file_path = os.path.join(self.file_path, "std.nc")

        mean_ds = xr.open_dataset(mean_file_path)
        std_ds = xr.open_dataset(std_file_path)

        self.mean = mean_ds["mean"].values.reshape(-1, 1, 1)
        self.std = std_ds["std"].values.reshape(-1, 1, 1)

        print("Start loading all hourly data from the HDF5 file...")
        start_time = time.time()

        if self.training:
            years = ["2016", "2017", "2018"] if years is None else years
        else:
            years = ["2019"] if years is None else years

        all_hourly_data = []
        for year in years:
            h5_filepath = os.path.join(self.file_path, f"{year}.h5")
            if not os.path.exists(h5_filepath):
                raise FileNotFoundError(f"h5 file not found: {h5_filepath}")

            print(f"Loading {h5_filepath}...")
            with h5py.File(h5_filepath, "r") as hf:
                all_hourly_data.append(hf["data"][:])

        self.data_hourly = np.concatenate(all_hourly_data, axis=0)

        end_time = time.time()
        print("Data loaded!")
        print(
            f"Total hours: {self.data_hourly.shape[0]}, Shape: {self.data_hourly.shape}"
        )
        print(f"Estimated memory usage: {self.data_hourly.nbytes / 1e9:.2f} GB")
        print(f"Loading time: {end_time - start_time:.2f} seconds.")

        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

    def __len__(self):
        group_size = 24 * 7  # 7 days of hourly data
        span = 2 * self.sq_length * group_size
        return self.data_hourly.shape[0] - span + 1

    def __getitem__(self, global_idx):
        x_start_hour = global_idx
        x_end_hour = x_start_hour + self.sq_length * self.group_size

        y_start_hour = x_end_hour
        y_end_hour = y_start_hour + self.sq_length * self.group_size

        x_hourly = self.data_hourly[x_start_hour:x_end_hour]
        y_hourly = self.data_hourly[y_start_hour:y_end_hour]

        x_weekly_groups = x_hourly.reshape(
            self.sq_length, self.group_size, *x_hourly.shape[1:]
        )
        y_weekly_groups = y_hourly.reshape(
            self.sq_length, self.group_size, *y_hourly.shape[1:]
        )

        x = np.mean(x_weekly_groups, axis=1)  # x.shape: (sq_length, 12, H, W)
        y = np.mean(y_weekly_groups, axis=1)  # y.shape: (sq_length, 12, H, W)

        x = (x - self.mean) / self.std
        y = (y - self.mean) / self.std

        x, y = self._random_crop(x, y)

        input_item = {self.input_keys[0]: x.astype(np.float32)}
        label_item = {self.label_keys[0]: y.astype(np.float32)}

        weight_shape = [1] * len(next(iter(label_item.values())).shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return input_item, label_item, weight_item

    def _random_crop(self, x, y):
        if isinstance(self.size, numbers.Number):
            self.size = (int(self.size), int(self.size))

        th, tw = self.size
        h, w = y.shape[-2], y.shape[-1]  # Get the original height and width from y

        x1 = random.randint(0, w - tw)
        y1 = random.randint(0, h - th)

        x_cropped = x[..., y1 : y1 + th, x1 : x1 + tw]
        y_cropped = y[..., y1 : y1 + th, x1 : x1 + tw]

        return x_cropped, y_cropped

ERA5Dataset

Bases: Dataset

Class for ERA5 dataset.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
precip_file_path Optional[str]

Precipitation data set path. Defaults to None.

None
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
vars_channel Optional[Tuple[int, ...]]

The variable channel index in ERA5 dataset. Defaults to None.

None
num_label_timestamps int

Number of timestamp of label. Defaults to 1.

1
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None
training bool

Whether in train mode. Defaults to True.

True
stride int

Stride of sampling data. Defaults to 1.

1

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.ERA5Dataset(
...     "file_path": "/path/to/ERA5Dataset",
...     "input_keys": ("input",),
...     "label_keys": ("output",),
... )
Source code in ppsci/data/dataset/era5_dataset.py
class ERA5Dataset(io.Dataset):
    """Class for ERA5 dataset.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        precip_file_path (Optional[str]): Precipitation data set path. Defaults to None.
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        vars_channel (Optional[Tuple[int, ...]]): The variable channel index in ERA5 dataset. Defaults to None.
        num_label_timestamps (int, optional): Number of timestamp of label. Defaults to 1.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.
        training (bool, optional): Whether in train mode. Defaults to True.
        stride (int, optional): Stride of sampling data. Defaults to 1.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.ERA5Dataset(
        ...     "file_path": "/path/to/ERA5Dataset",
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        precip_file_path: Optional[str] = None,
        weight_dict: Optional[Dict[str, float]] = None,
        vars_channel: Optional[Tuple[int, ...]] = None,
        num_label_timestamps: int = 1,
        transforms: Optional[vision.Compose] = None,
        training: bool = True,
        stride: int = 1,
    ):
        super().__init__()
        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.precip_file_path = precip_file_path

        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.vars_channel = list(range(20)) if vars_channel is None else vars_channel
        self.num_label_timestamps = num_label_timestamps
        self.transforms = transforms
        self.training = training
        self.stride = stride

        self.files = self.read_data(file_path)
        self.n_years = len(self.files)
        self.num_samples_per_year = self.files[0].shape[0]
        self.num_samples = self.n_years * self.num_samples_per_year
        if self.precip_file_path is not None:
            self.precip_files = self.read_data(precip_file_path, "tp")

    def read_data(self, path: str, var="fields"):
        paths = [path] if path.endswith(".h5") else glob.glob(path + "/*.h5")
        paths.sort()
        files = []
        for path_ in paths:
            _file = h5py.File(path_, "r")
            files.append(_file[var])
        return files

    def __len__(self):
        return self.num_samples // self.stride

    def __getitem__(self, global_idx):
        global_idx *= self.stride
        year_idx = global_idx // self.num_samples_per_year
        local_idx = global_idx % self.num_samples_per_year
        step = 0 if local_idx >= self.num_samples_per_year - 1 else 1

        if self.num_label_timestamps > 1:
            if local_idx >= self.num_samples_per_year - self.num_label_timestamps:
                local_idx = self.num_samples_per_year - self.num_label_timestamps - 1

        input_file = self.files[year_idx]
        label_file = (
            self.precip_files[year_idx]
            if self.precip_file_path is not None
            else input_file
        )
        if self.precip_file_path is not None and year_idx == 0 and self.training:
            # first year has 2 missing samples in precip (they are first two time points)
            lim = self.num_samples_per_year - 2
            local_idx = local_idx % lim
            step = 0 if local_idx >= lim - 1 else 1
            input_idx = local_idx + 2
            label_idx = local_idx + step
        else:
            input_idx, label_idx = local_idx, local_idx + step

        input_item = {self.input_keys[0]: input_file[input_idx, self.vars_channel]}

        label_item = {}
        for i in range(self.num_label_timestamps):
            if self.precip_file_path is not None:
                label_item[self.label_keys[i]] = np.expand_dims(
                    label_file[label_idx + i], 0
                )
            else:
                label_item[self.label_keys[i]] = label_file[
                    label_idx + i, self.vars_channel
                ]

        weight_shape = [1] * len(next(iter(label_item.values())).shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return input_item, label_item, weight_item

ERA5MeteoDataset

Bases: Dataset

ERA5 dataset for multi-meteorological-element prediction (r, t, u, v).

Parameters:

Name Type Description Default
file_path str

Dataset path (contains .npy files in year folders).

required
input_keys Tuple[str, ...]

Input dict keys, e.g. ("input",).

required
label_keys Tuple[str, ...]

Label dict keys, e.g. ("output",).

required
size Tuple[int, int]

Crop size (height, width).

required
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
transforms Optional[Compose]

Optional transforms. Defaults to None.

None
training bool

If in training mode (2016-2018). Else validation mode (2019).

True
stride int

Stride for sampling. Defaults to 1.

1
sq_length int

Sequence length for input and output. Defaults to 6.

6
Source code in ppsci/data/dataset/era5meteo_dataset.py
class ERA5MeteoDataset(io.Dataset):
    """ERA5 dataset for multi-meteorological-element prediction (r, t, u, v).

    Args:
        file_path (str): Dataset path (contains .npy files in year folders).
        input_keys (Tuple[str, ...]): Input dict keys, e.g. ("input",).
        label_keys (Tuple[str, ...]): Label dict keys, e.g. ("output",).
        size (Tuple[int, int]): Crop size (height, width).
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        transforms (Optional[vision.Compose]): Optional transforms. Defaults to None.
        training (bool): If in training mode (2016-2018). Else validation mode (2019).
        stride (int): Stride for sampling. Defaults to 1.
        sq_length (int): Sequence length for input and output. Defaults to 6.
    """

    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        size: Tuple[int, ...],
        weight_dict: Optional[Dict[str, float]] = None,
        transforms: Optional[vision.Compose] = None,
        training: bool = True,
        stride: int = 1,
        sq_length: int = 6,
    ):
        super().__init__()
        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.size = size
        self.training = training
        self.sq_length = sq_length
        self.transforms = transforms
        self.stride = stride

        mean_file_path = os.path.join(self.file_path, "mean.nc")
        std_file_path = os.path.join(self.file_path, "std.nc")

        mean_ds = xr.open_dataset(mean_file_path)
        std_ds = xr.open_dataset(std_file_path)

        self.mean = mean_ds["mean"].values.reshape(-1, 1, 1)
        self.std = std_ds["std"].values.reshape(-1, 1, 1)

        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.time_table = self._build_time_table()

    def _build_time_table(self):
        """Build datetime list from available .npy files, filtered by years."""
        years = sorted([y for y in os.listdir(self.file_path) if y.isdigit()])

        if self.training:
            target_years = {"2016", "2017", "2018"}
        else:
            target_years = {"2016", "2019"}

        time_list = []
        for y in years:
            if y not in target_years:
                continue
            year_dir = os.path.join(self.file_path, y)
            files = sorted(os.listdir(year_dir))
            for fname in files:
                if fname.startswith("r_") and fname.endswith(".npy"):
                    dt_str = fname[2:12]  # YYYYMMDDHH
                    dt = datetime.datetime.strptime(dt_str, "%Y%m%d%H")
                    time_list.append(dt)

        return sorted(time_list)

    def __len__(self):
        return len(self.time_table) - self.sq_length * 2 + 1

    def __getitem__(self, global_idx):
        x_list, y_list = [], []

        for m in range(self.sq_length):
            x_list.append(self.load_data(global_idx + m))

        for n in range(self.sq_length):
            y_list.append(self.load_data(global_idx + self.sq_length + n))

        x = np.stack(x_list, axis=0)
        y = np.stack(y_list, axis=0)

        # Normalize
        x = (x - self.mean) / self.std
        y = (y - self.mean) / self.std

        x, y = self._random_crop(x, y)

        input_item = {self.input_keys[0]: x}
        label_item = {self.label_keys[0]: y}

        weight_shape = [1] * len(next(iter(label_item.values())).shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return input_item, label_item, weight_item

    def load_data(self, indices):
        """Load r, t, u, v for a given index."""
        dt = self.time_table[indices]
        year = f"{dt.year:04d}"
        mon = f"{dt.month:02d}"
        day = f"{dt.day:02d}"
        hour = f"{dt.hour:02d}"

        r_data = np.load(
            os.path.join(self.file_path, year, f"r_{year}{mon}{day}{hour}.npy")
        )
        t_data = np.load(
            os.path.join(self.file_path, year, f"t_{year}{mon}{day}{hour}.npy")
        )
        u_data = np.load(
            os.path.join(self.file_path, year, f"u_{year}{mon}{day}{hour}.npy")
        )
        v_data = np.load(
            os.path.join(self.file_path, year, f"v_{year}{mon}{day}{hour}.npy")
        )

        data = np.concatenate([r_data, t_data, u_data, v_data])
        return data

    def _random_crop(self, x, y):
        if isinstance(self.size, numbers.Number):
            self.size = (int(self.size), int(self.size))

        th, tw = self.size
        h, w = y.shape[-2], y.shape[-1]

        x1 = random.randint(0, w - tw)
        y1 = random.randint(0, h - th)

        x_cropped = x[..., y1 : y1 + th, x1 : x1 + tw]
        y_cropped = y[..., y1 : y1 + th, x1 : x1 + tw]

        return x_cropped, y_cropped

load_data(indices)

Load r, t, u, v for a given index.

Source code in ppsci/data/dataset/era5meteo_dataset.py
def load_data(self, indices):
    """Load r, t, u, v for a given index."""
    dt = self.time_table[indices]
    year = f"{dt.year:04d}"
    mon = f"{dt.month:02d}"
    day = f"{dt.day:02d}"
    hour = f"{dt.hour:02d}"

    r_data = np.load(
        os.path.join(self.file_path, year, f"r_{year}{mon}{day}{hour}.npy")
    )
    t_data = np.load(
        os.path.join(self.file_path, year, f"t_{year}{mon}{day}{hour}.npy")
    )
    u_data = np.load(
        os.path.join(self.file_path, year, f"u_{year}{mon}{day}{hour}.npy")
    )
    v_data = np.load(
        os.path.join(self.file_path, year, f"v_{year}{mon}{day}{hour}.npy")
    )

    data = np.concatenate([r_data, t_data, u_data, v_data])
    return data

ERA5SampledDataset

Bases: Dataset

Class for ERA5 sampled dataset.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.ERA5SampledDataset(
...     "file_path": "/path/to/ERA5SampledDataset",
...     "input_keys": ("input",),
...     "label_keys": ("output",),
... )
>>> # get the length of the dataset
>>> dataset_size = len(dataset)
>>> # get the first sample of the data
>>> first_sample = dataset[0]
>>> print("First sample:", first_sample)
Source code in ppsci/data/dataset/era5_dataset.py
class ERA5SampledDataset(io.Dataset):
    """Class for ERA5 sampled dataset.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.ERA5SampledDataset(
        ...     "file_path": "/path/to/ERA5SampledDataset",
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ... )  # doctest: +SKIP
        >>> # get the length of the dataset
        >>> dataset_size = len(dataset)  # doctest: +SKIP
        >>> # get the first sample of the data
        >>> first_sample = dataset[0]  # doctest: +SKIP
        >>> print("First sample:", first_sample)  # doctest: +SKIP
    """

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        weight_dict: Optional[Dict[str, float]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys

        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.transforms = transforms

        self.files = self.read_data(file_path)
        self.num_samples = len(self.files)

    def read_data(self, path: str):
        paths = glob.glob(path + "/*.h5")
        paths.sort()
        files = []
        for _path in paths:
            _file = h5py.File(_path, "r")
            files.append(_file)
        return files

    def __len__(self):
        return self.num_samples

    def __getitem__(self, global_idx):
        _file = self.files[global_idx]

        input_item = {}
        for key in _file["input_dict"]:
            input_item[key] = np.asarray(
                _file["input_dict"][key], paddle.get_default_dtype()
            )

        label_item = {}
        for key in _file["label_dict"]:
            label_item[key] = np.asarray(
                _file["label_dict"][key], paddle.get_default_dtype()
            )

        weight_shape = [1] * len(next(iter(label_item.values())).shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return input_item, label_item, weight_item

ERA5SQDataset

Bases: Dataset

Class for ERA5 dataset.

Parameters:

Name Type Description Default
file_path str

Dataset path.

required
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None
training bool

Whether in train mode. Defaults to True.

True
sq_length int

Length of sequence for time series data. Defaults to 6.

6

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.ERA5SQDataset(
...     "file_path": "/path/to/ERA5SQDataset",
...     "input_keys": ("input",),
...     "label_keys": ("output",),
... )
Source code in ppsci/data/dataset/era5sq_dataset.py
class ERA5SQDataset(io.Dataset):
    """Class for ERA5 dataset.

    Args:
        file_path (str): Dataset path.
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.
        training (bool, optional): Whether in train mode. Defaults to True.
        sq_length (int, optional): Length of sequence for time series data. Defaults to 6.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.ERA5SQDataset(
        ...     "file_path": "/path/to/ERA5SQDataset",
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        size: Tuple[int, ...],
        weight_dict: Optional[Dict[str, float]] = None,
        transforms: Optional[vision.Compose] = None,
        training: bool = True,
        sq_length: int = 6,
    ):
        super().__init__()
        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.size = size
        self.training = training
        self.sq_length = sq_length
        self.transforms = transforms

        mean_file_path = os.path.join(self.file_path, "mean.nc")
        std_file_path = os.path.join(self.file_path, "std.nc")

        mean_ds = xr.open_dataset(mean_file_path)
        std_ds = xr.open_dataset(std_file_path)

        self.mean = mean_ds["mean"].values.reshape(-1, 1, 1)
        self.std = std_ds["std"].values.reshape(-1, 1, 1)

        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        if training:
            self.precipitation = h5py.File(
                os.path.join(self.file_path, "rain_2016_01.h5")
            )
        else:
            self.precipitation = h5py.File(
                os.path.join(self.file_path, "rain_2016_01.h5")
            )

        t_list = self.precipitation["time"][:]
        start_time = datetime.datetime(1900, 1, 1, 0, 0, 0)
        self.time_table = []
        for i in range(len(t_list)):
            temp = start_time + datetime.timedelta(hours=int(t_list[i]))
            self.time_table.append(temp)

    def __len__(self):
        return len(self.time_table) - self.sq_length * 2 + 1

    def __getitem__(self, global_idx):
        x_list, y_list = [], []
        for m in range(self.sq_length):
            x_list.append(self.load_data(global_idx + m))
        for n in range(self.sq_length):
            y_list.append(self.precipitation["tp"][global_idx + self.sq_length + n])

        x = np.stack(x_list, axis=0)
        y = np.stack(y_list, axis=0)
        y = np.expand_dims(y, axis=1)

        x = (x - self.mean) / self.std

        x, y = self._random_crop(x, y)

        input_item = {self.input_keys[0]: x}
        label_item = {self.label_keys[0]: y}

        weight_shape = [1] * len(next(iter(label_item.values())).shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return input_item, label_item, weight_item

    def load_data(self, indices):
        year = str(self.time_table[indices].timetuple().tm_year)
        mon = str(self.time_table[indices].timetuple().tm_mon)
        if len(mon) == 1:
            mon = "0" + mon
        day = str(self.time_table[indices].timetuple().tm_mday)
        if len(day) == 1:
            day = "0" + day
        hour = str(self.time_table[indices].timetuple().tm_hour)
        if len(hour) == 1:
            hour = "0" + hour
        r_data = np.load(
            os.path.join(self.file_path, year, f"r_{year}{mon}{day}{hour}.npy")
        )
        t_data = np.load(
            os.path.join(self.file_path, year, f"t_{year}{mon}{day}{hour}.npy")
        )
        u_data = np.load(
            os.path.join(self.file_path, year, f"u_{year}{mon}{day}{hour}.npy")
        )
        v_data = np.load(
            os.path.join(self.file_path, year, f"v_{year}{mon}{day}{hour}.npy")
        )

        data = np.concatenate([r_data, t_data, u_data, v_data])

        return data

    def _random_crop(self, x, y):
        if isinstance(self.size, numbers.Number):
            self.size = (int(self.size), int(self.size))

        th, tw = self.size
        h, w = y.shape[-2], y.shape[-1]

        x1 = random.randint(0, w - tw)
        y1 = random.randint(0, h - th)

        x_cropped = x[..., y1 : y1 + th, x1 : x1 + tw]
        y_cropped = y[..., y1 : y1 + th, x1 : x1 + tw]

        return x_cropped, y_cropped

ExtMoEENSODataset

Bases: Dataset

The El Niño/Southern Oscillation dataset.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Name of input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Name of label keys, such as ("output",).

required
data_dir str

The directory of data.

required
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
in_len int

The length of input data. Defaults to 12.

12
out_len int

The length of out data. Defaults to 26.

26
in_stride int

The stride of input data. Defaults to 1.

1
out_stride int

The stride of output data. Defaults to 1.

1
train_samples_gap int

The stride of sequence sampling during training. Defaults to 10. e.g., samples_gap = 10, the first seq contains [0, 1, ..., T-1] frame indices, the second seq contains [10, 11, .., T+9]

10
eval_samples_gap int

The stride of sequence sampling during eval. Defaults to 11.

11
normalize_sst bool

Whether to use normalization. Defaults to True.

True
batch_size int

Batch size. Defaults to 1.

1
num_workers int

The num of workers. Defaults to 1.

1
training str

Training pathse. Defaults to "train".

'train'
Source code in ppsci/data/dataset/ext_moe_enso_dataset.py
class ExtMoEENSODataset(io.Dataset):
    """The El Niño/Southern Oscillation dataset.

    Args:
        input_keys (Tuple[str, ...]): Name of input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Name of label keys, such as ("output",).
        data_dir (str): The directory  of data.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of each constraint variable. Defaults to None.
        in_len (int, optional): The length of input data. Defaults to 12.
        out_len (int, optional): The length of out data. Defaults to 26.
        in_stride (int, optional): The stride of input data. Defaults to 1.
        out_stride (int, optional): The stride of output data. Defaults to 1.
        train_samples_gap (int, optional): The stride of sequence sampling during training. Defaults to 10.
            e.g., samples_gap = 10, the first seq contains [0, 1, ..., T-1] frame indices, the second seq contains [10, 11, .., T+9]
        eval_samples_gap (int, optional): The stride of sequence sampling during eval. Defaults to 11.
        normalize_sst (bool, optional): Whether to use normalization. Defaults to True.
        batch_size (int, optional): Batch size. Defaults to 1.
        num_workers (int, optional): The num of workers. Defaults to 1.
        training (str, optional): Training pathse. Defaults to "train".
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        weight_dict: Optional[Dict[str, float]] = None,
        in_len: int = 12,
        out_len: int = 26,
        in_stride: int = 1,
        out_stride: int = 1,
        train_samples_gap: int = 10,
        eval_samples_gap: int = 11,
        normalize_sst: bool = True,
        batch_size: int = 1,
        num_workers: int = 1,
        training: str = "train",
    ):
        super(ExtMoEENSODataset, self).__init__()
        if importlib.util.find_spec("xarray") is None:
            raise ModuleNotFoundError(
                "To use RadarDataset, please install 'xarray' with: `pip install "
                "xarray` first."
            )
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.in_len = in_len
        self.out_len = out_len
        self.in_stride = in_stride
        self.out_stride = out_stride
        self.train_samples_gap = train_samples_gap
        self.eval_samples_gap = eval_samples_gap
        self.normalize_sst = normalize_sst
        # datamodule_only
        self.batch_size = batch_size
        if num_workers != 1:
            raise ValueError(
                "Current implementation does not support `num_workers != 1`!"
            )
        self.num_workers = num_workers
        self.training = training

        # pre-data
        cmip6sst, cmip5sst, cmip6nino, cmip5nino = read_raw_data(self.data_dir)
        # TODO: more flexible train/val/test split
        self.sst_train = [cmip6sst, cmip5sst[..., :-2]]
        self.nino_train = [cmip6nino, cmip5nino[..., :-2]]
        self.sst_eval = [cmip5sst[..., -2:-1]]
        self.nino_eval = [cmip5nino[..., -2:-1]]
        self.sst_test = [cmip5sst[..., -1:]]
        self.nino_test = [cmip5nino[..., -1:]]

        self.sst, self.target_nino = self.create_data()

    def create_data(
        self,
    ):
        if self.training == "train":
            sst_cmip6 = self.sst_train[0]
            nino_cmip6 = self.nino_train[0]
            sst_cmip5 = self.sst_train[1]
            nino_cmip5 = self.nino_train[1]
            samples_gap = self.train_samples_gap
        elif self.training == "eval":
            sst_cmip6 = None
            nino_cmip6 = None
            sst_cmip5 = self.sst_eval[0]
            nino_cmip5 = self.nino_eval[0]
            samples_gap = self.eval_samples_gap
        elif self.training == "test":
            sst_cmip6 = None
            nino_cmip6 = None
            sst_cmip5 = self.sst_test[0]
            nino_cmip5 = self.nino_test[0]
            samples_gap = self.eval_samples_gap

        # cmip6 (N, *, 15)
        # cmip5 (N, *, 17)
        sst = []
        target_nino = []

        nino_idx_slice = slice(
            self.in_len, self.in_len + self.out_len - NINO_WINDOW_T + 1
        )  # e.g., 12:36
        if sst_cmip6 is not None:
            assert len(sst_cmip6.shape) == 4
            assert len(nino_cmip6.shape) == 2
            idx_sst = prepare_inputs_targets(
                len_time=sst_cmip6.shape[0],
                input_length=self.in_len,
                input_gap=self.in_stride,
                pred_shift=self.out_len * self.out_stride,
                pred_length=self.out_len,
                samples_gap=samples_gap,
            )

            sst.append(cat_over_last_dim(sst_cmip6[idx_sst]))
            target_nino.append(
                cat_over_last_dim(nino_cmip6[idx_sst[:, nino_idx_slice]])
            )
        if sst_cmip5 is not None:
            assert len(sst_cmip5.shape) == 4
            assert len(nino_cmip5.shape) == 2
            idx_sst = prepare_inputs_targets(
                len_time=sst_cmip5.shape[0],
                input_length=self.in_len,
                input_gap=self.in_stride,
                pred_shift=self.out_len * self.out_stride,
                pred_length=self.out_len,
                samples_gap=samples_gap,
            )
            sst.append(cat_over_last_dim(sst_cmip5[idx_sst]))
            target_nino.append(
                cat_over_last_dim(nino_cmip5[idx_sst[:, nino_idx_slice]])
            )

        # sst data containing both the input and target
        self.sst = np.concatenate(sst, axis=0)  # (N, in_len+out_len, lat, lon)
        if self.normalize_sst:
            self.sst = scale_sst(self.sst)
        # nino data containing the target only
        self.target_nino = np.concatenate(
            target_nino, axis=0
        )  # (N, out_len+NINO_WINDOW_T-1)
        assert self.sst.shape[0] == self.target_nino.shape[0]
        assert self.sst.shape[1] == self.in_len + self.out_len
        assert self.target_nino.shape[1] == self.out_len - NINO_WINDOW_T + 1

        return self.sst, self.target_nino

    def get_datashape(self):
        return {"sst": self.sst.shape, "nino target": self.target_nino.shape}

    def __len__(self):
        return self.sst.shape[0]

    def __getitem__(self, idx):
        sst_data = self.sst[idx].astype("float32")
        sst_data = sst_data[..., np.newaxis]
        in_seq = sst_data[: self.in_len, ...]  # ( in_len, lat, lon, 1)
        target_seq = sst_data[self.in_len :, ...]  # ( in_len, lat, lon, 1)
        weight_item = self.weight_dict

        if self.training == "train":
            input_item = {self.input_keys[0]: in_seq, "sst_target": target_seq}
            label_item = {
                self.label_keys[0]: target_seq,
            }

            return input_item, label_item, weight_item
        else:
            input_item = {self.input_keys[0]: in_seq, "sst_target": target_seq}
            label_item = {
                self.label_keys[0]: target_seq,
                self.label_keys[1]: self.target_nino[idx],
            }

            return input_item, label_item, weight_item

IFMMoeDataset

Bases: Dataset

Dataset for IFMMoe.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Name of input data.

required
label_keys Tuple[str, ...]

Name of label data.

required
data_dir str

Directory of IFMMoe data.

required
data_label str

IFMMoe data label in tox21/esol/freesolv/lipop...

required
data_mode str

train/val/test mode data.

required

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.IFMMoeDataset(
...     "input_keys": ("input",),
...     "label_keys": ("output",),
...     "data_dir": "/path/to/IFMMoeDataset",
...     "data_label": "tox21",
...     "data_mode": "train",
... )
Source code in ppsci/data/dataset/ifm_moe_dataset.py
class IFMMoeDataset(io.Dataset):
    """Dataset for `IFMMoe`.

    Args:
        input_keys (Tuple[str, ...]): Name of input data.
        label_keys (Tuple[str, ...]): Name of label data.
        data_dir (str): Directory of IFMMoe data.
        data_label (str): IFMMoe data label in tox21/esol/freesolv/lipop...
        data_mode (str): train/val/test mode data.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.IFMMoeDataset(
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ...     "data_dir": "/path/to/IFMMoeDataset",
        ...     "data_label": "tox21",
        ...     "data_mode": "train",
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False
    use_pgl: bool = False

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        data_label: str,
        data_mode: str,
    ):
        self.input_keys = input_keys
        self.label_keys = label_keys

        self.data_label = data_label
        self.data_dir = data_dir
        self.data_mode = data_mode

        if data_label == "esol" or data_label == "freesolv" or data_label == "lipop":
            self.task_type = "reg"
            self.reg = True
            # metric = "rmse"
        else:
            self.task_type = "cla"
            self.reg = False
            # metric = "roc_auc"

        self.task_dict = tasks_dic

        self.Xs = None
        self.Ys = None
        self.mask = None
        self.process_data()

    def process_data(self):
        file_name = os.path.join(self.data_dir, self.data_label + "_moe_pubsubfp.csv")
        # preprocess data
        dataset_all = pd.read_csv(file_name)
        if self.data_label == "freesolv":
            dataset_all.drop(columns=["vsa_pol", "h_emd", "a_donacc"], inplace=True)
        elif self.data_label == "esol":
            dataset_all.drop(columns=["logS", "h_logS", "SlogP"], inplace=True)
        else:
            dataset_all.drop(columns=["SlogP", "h_logD", "logS"], inplace=True)
        tasks = tasks_dic[self.data_label]
        cols = copy.deepcopy(tasks)
        cols.extend(dataset_all.columns[len(tasks) + 1 :])
        dataset = dataset_all[cols]
        x_cols = dataset_all.columns[len(tasks) + 1 :]
        # remove the features with na
        if self.data_label != "hiv":
            rm_cols1 = (
                dataset[x_cols]
                .isnull()
                .any()[dataset[x_cols].isnull().any() == True]  # noqa: E712
                .index
            )
            dataset.drop(columns=rm_cols1, inplace=True)
        else:
            rm_indx1 = (
                dataset[x_cols]
                .isnull()
                .T.any()[dataset[x_cols].isnull().T.any() == True]  # noqa: E712
                .index
            )
            dataset.drop(index=rm_indx1, inplace=True)
        x_cols = dataset.columns.drop(tasks)

        # Removing features with low variance
        # threshold = 0.05
        data_fea_var = dataset[x_cols].var()
        del_fea1 = list(data_fea_var[data_fea_var <= 0.05].index)
        dataset.drop(columns=del_fea1, inplace=True)
        x_cols = dataset.columns.drop(tasks)

        # pair correlations
        # threshold = 0.95
        data_fea_corr = dataset[x_cols].corr()
        del_fea2_col = []
        del_fea2_ind = []
        length = data_fea_corr.shape[1]
        for i in range(length):
            for j in range(i + 1, length):
                if abs(data_fea_corr.iloc[i, j]) >= 0.95:
                    del_fea2_col.append(data_fea_corr.columns[i])
                    del_fea2_ind.append(data_fea_corr.index[j])
        dataset.drop(columns=del_fea2_ind, inplace=True)
        # standardize the features
        cols_ = dataset.columns[len(tasks) + 1 :]
        # print('the retained features for %s is %d' % (args.task, len(cols_)))
        dataset[cols_] = dataset[cols_].apply(standardize, axis=0)

        dataseta = pd.read_csv(
            os.path.join(
                self.data_dir, "dataset_used_for_modeling", self.data_label + ".csv"
            )
        )
        data_tr = dataset[dataseta.group == "train"]
        data_va = dataset[dataseta.group == "valid"]
        data_te = dataset[dataseta.group == "test"]

        # training set
        data_tr_y = data_tr[tasks].values.reshape(-1, len(tasks))
        data_tr_x = data_tr.iloc[:, len(tasks) :].values  # 249
        # data_tr_x = data_tr.iloc[:, len(tasks):].values
        # test set
        data_te_y = data_te[tasks].values.reshape(-1, len(tasks))
        data_te_x = data_te.iloc[:, len(tasks) :].values
        # data_te_x = data_te.iloc[:, len(tasks):].values

        # validation set
        data_va_y = data_va[tasks].values.reshape(-1, len(tasks))
        data_va_x = data_va.iloc[:, len(tasks) :].values
        # data_va_x = data_va.iloc[:, len(tasks):].values

        # dataloader
        # train_dataset = MyDataset(data_tr_x, data_tr_y)
        # validation_dataset = MyDataset(data_va_x, data_va_y)
        # test_dataset = MyDataset(data_te_x, data_te_y)
        if self.data_mode == "train":
            Xs, Ys = data_tr_x, data_tr_y
        elif self.data_mode == "val":
            Xs, Ys = data_va_x, data_va_y
        elif self.data_mode == "test":
            Xs, Ys = data_te_x, data_te_y
        if not self.reg:
            self.pos_weights = get_pos_weight(dataset[tasks].values)

        self.data_tr_x = data_tr_x
        self.Xs = Xs
        self.Ys = np.nan_to_num(Ys)
        self.mask = ~np.isnan(Ys) * 1.0

    def __len__(self):
        return len(self.Ys)

    def __getitem__(self, idx):
        return (
            {
                self.input_keys[0]: paddle.to_tensor(self.Xs[idx], dtype="float32"),
            },
            {
                self.label_keys[0]: paddle.to_tensor(self.Ys[idx], dtype="float32"),
                self.label_keys[1]: paddle.to_tensor(self.mask[idx], dtype="float32"),
            },
            {},
        )

IterableCSVDataset

Bases: IterableDataset

IterableCSVDataset for full-data loading.

Parameters:

Name Type Description Default
file_path str

CSV file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys.

required
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.IterableCSVDataset(
...     "/path/to/file.csv"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/csv_dataset.py
class IterableCSVDataset(io.IterableDataset):
    """IterableCSVDataset for full-data loading.

    Args:
        file_path (str): CSV file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...]): List of label keys.
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.IterableCSVDataset(
        ...     "/path/to/file.csv"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_csv_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = (
            {key: np.ones_like(next(iter(self.label.values()))) for key in self.label}
            if weight_dict is not None
            else {}
        )
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.input = {key: paddle.to_tensor(value) for key, value in self.input.items()}
        self.label = {key: paddle.to_tensor(value) for key, value in self.label.items()}
        self.weight = {
            key: paddle.to_tensor(value) for key, value in self.weight.items()
        }

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        return self._len

    def __iter__(self):
        if callable(self.transforms):
            input_, label_, weight_ = self.transforms(
                self.input, self.label, self.weight
            )
            yield input_, label_, weight_
        else:
            yield self.input, self.label, self.weight

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

IterableMatDataset

Bases: IterableDataset

IterableMatDataset for full-data loading.

Parameters:

Name Type Description Default
file_path str

Mat file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys. Defaults to ().

()
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.IterableMatDataset(
...     "/path/to/file.mat"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/mat_dataset.py
class IterableMatDataset(io.IterableDataset):
    """IterableMatDataset for full-data loading.

    Args:
        file_path (str): Mat file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...], optional): List of label keys. Defaults to ().
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.IterableMatDataset(
        ...     "/path/to/file.mat"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...] = (),
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_mat_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = (
            {key: np.ones_like(next(iter(self.label.values()))) for key in self.label}
            if weight_dict is not None
            else {}
        )
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.input = {key: paddle.to_tensor(value) for key, value in self.input.items()}
        self.label = {key: paddle.to_tensor(value) for key, value in self.label.items()}
        self.weight = {
            key: paddle.to_tensor(value) for key, value in self.weight.items()
        }

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        return self._len

    def __iter__(self):
        if callable(self.transforms):
            input_, label_, weight_ = self.transforms(
                self.input, self.label, self.weight
            )
            yield input_, label_, weight_
        else:
            yield self.input, self.label, self.weight

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

IterableNamedArrayDataset

Bases: IterableDataset

IterableNamedArrayDataset for full-data loading.

Parameters:

Name Type Description Default
input Dict[str, ndarray]

Input dict.

required
label Optional[Dict[str, ndarray]]

Label dict. Defaults to None.

None
weight Optional[Dict[str, ndarray]]

Weight dict. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> input = {"x": np.random.randn(100, 1)}
>>> label = {"u": np.random.randn(100, 1)}
>>> weight = {"u": np.random.randn(100, 1)}
>>> dataset = ppsci.data.dataset.IterableNamedArrayDataset(input, label, weight)
Source code in ppsci/data/dataset/array_dataset.py
class IterableNamedArrayDataset(io.IterableDataset):
    """IterableNamedArrayDataset for full-data loading.

    Args:
        input (Dict[str, np.ndarray]): Input dict.
        label (Optional[Dict[str, np.ndarray]]): Label dict. Defaults to None.
        weight (Optional[Dict[str, np.ndarray]]): Weight dict. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> input = {"x": np.random.randn(100, 1)}
        >>> label = {"u": np.random.randn(100, 1)}
        >>> weight = {"u": np.random.randn(100, 1)}
        >>> dataset = ppsci.data.dataset.IterableNamedArrayDataset(input, label, weight)
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        input: Dict[str, np.ndarray],
        label: Optional[Dict[str, np.ndarray]] = None,
        weight: Optional[Dict[str, np.ndarray]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input = {key: paddle.to_tensor(value) for key, value in input.items()}
        self.label = (
            {key: paddle.to_tensor(value) for key, value in label.items()}
            if label is not None
            else {}
        )
        self.input_keys = tuple(input.keys())
        self.label_keys = tuple(self.label.keys())
        self.weight = (
            {
                key: paddle.to_tensor(value, paddle.get_default_dtype())
                for key, value in weight.items()
            }
            if weight is not None
            else None
        )
        self._len = len(next(iter(self.input.values())))
        self.transforms = transforms
        self.world_size_ = dist.get_world_size()
        self.rank_ = dist.get_rank()

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        return self._len

    def __iter__(self):
        if callable(self.transforms):
            input_, label_, weight_ = self.transforms(
                self.input, self.label, self.weight
            )
        else:
            input_, label_, weight_ = self.input, self.label, self.weight

        if self.world_size_ > 1:
            input_ = _group_dict_into_local_rank(input_, self.rank_, self.world_size_)
            label_ = _group_dict_into_local_rank(label_, self.rank_, self.world_size_)
            weight_ = _group_dict_into_local_rank(weight_, self.rank_, self.world_size_)

        yield input_, label_, weight_

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

IterableNPZDataset

Bases: IterableDataset

IterableNPZDataset for full-data loading.

Parameters:

Name Type Description Default
file_path str

Npz file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys. Defaults to ().

()
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.IterableNPZDataset(
...     "/path/to/file.npz"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/npz_dataset.py
class IterableNPZDataset(io.IterableDataset):
    """IterableNPZDataset for full-data loading.

    Args:
        file_path (str): Npz file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...], optional): List of label keys. Defaults to ().
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.IterableNPZDataset(
        ...     "/path/to/file.npz"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...] = (),
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_npz_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = {}
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.input = {key: paddle.to_tensor(value) for key, value in self.input.items()}
        self.label = {key: paddle.to_tensor(value) for key, value in self.label.items()}
        self.weight = {
            key: paddle.to_tensor(value) for key, value in self.weight.items()
        }

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        return self._len

    def __iter__(self):
        if callable(self.transforms):
            input_, label_, weight_ = self.transforms(
                self.input, self.label, self.weight
            )
            yield input_, label_, weight_
        else:
            yield self.input, self.label, self.weight

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

LorenzDataset

Bases: Dataset

Dataset for training Lorenz model.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("states",).

required
label_keys Tuple[str, ...]

Output keys, such as ("pred_states", "recover_states").

required
block_size int

Data block size.

required
stride int

Data stride.

required
ndata Optional[int]

Number of data series to use. Defaults to None.

None
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
embedding_model Optional[Arch]

Embedding model. Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.LorenzDataset(
...     "file_path": "/path/to/LorenzDataset",
...     "input_keys": ("x",),
...     "label_keys": ("v",),
...     "block_size": 32,
...     "stride": 16,
... )
Source code in ppsci/data/dataset/trphysx_dataset.py
class LorenzDataset(io.Dataset):
    """Dataset for training Lorenz model.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("states",).
        label_keys (Tuple[str, ...]): Output keys, such as ("pred_states", "recover_states").
        block_size (int): Data block size.
        stride (int): Data stride.
        ndata (Optional[int]): Number of data series to use. Defaults to None.
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        embedding_model (Optional[base.Arch]): Embedding model. Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.LorenzDataset(
        ...     "file_path": "/path/to/LorenzDataset",
        ...     "input_keys": ("x",),
        ...     "label_keys": ("v",),
        ...     "block_size": 32,
        ...     "stride": 16,
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        block_size: int,
        stride: int,
        ndata: Optional[int] = None,
        weight_dict: Optional[Dict[str, float]] = None,
        embedding_model: Optional[base.Arch] = None,
    ):
        super().__init__()
        if not os.path.exists(file_path):
            raise FileNotFoundError(
                f"file_path({file_path}) not exists. Please download dataset first. "
                "Training: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/lorenz_training_rk.hdf5. "
                "Valid: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/lorenz_valid_rk.hdf5."
            )

        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys

        self.block_size = block_size
        self.stride = stride
        self.ndata = ndata
        self.weight_dict = {key: 1.0 for key in self.label_keys}
        if weight_dict is not None:
            self.weight_dict.update(weight_dict)

        self.data = self.read_data(file_path, block_size, stride)
        self.embedding_model = embedding_model
        if embedding_model is None:
            self.embedding_data = None
        else:
            embedding_model.eval()
            with paddle.no_grad():
                data_tensor = paddle.to_tensor(self.data)
                embedding_data_tensor = embedding_model.encoder(data_tensor)
            self.embedding_data = embedding_data_tensor.numpy()

    def read_data(self, file_path: str, block_size: int, stride: int):
        data = []
        with h5py.File(file_path, "r") as f:
            data_num = 0
            for key in f.keys():
                data_series = np.asarray(f[key], dtype=paddle.get_default_dtype())
                for i in range(0, data_series.shape[0] - block_size + 1, stride):
                    data.append(data_series[i : i + block_size])
                data_num += 1
                if self.ndata is not None and data_num >= self.ndata:
                    break
        return np.asarray(data)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # when embedding data is None
        if self.embedding_data is None:
            data_item = self.data[idx]
            input_item = {self.input_keys[0]: data_item}
            label_item = {
                self.label_keys[0]: data_item[1:, :],
                self.label_keys[1]: data_item,
            }
        else:
            data_item = self.embedding_data[idx]
            input_item = {self.input_keys[0]: data_item[:-1, :]}
            label_item = {self.label_keys[0]: data_item[1:, :]}
            if len(self.label_keys) == 2:
                label_item[self.label_keys[1]] = self.data[idx][1:, :]

        weight_shape = [1] * len(data_item.shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }
        return (input_item, label_item, weight_item)

MatDataset

Bases: Dataset

Dataset class for .mat file.

Parameters:

Name Type Description Default
file_path str

Mat file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys. Defaults to ().

()
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.MatDataset(
...     "/path/to/file.mat"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/mat_dataset.py
class MatDataset(io.Dataset):
    """Dataset class for .mat file.

    Args:
        file_path (str): Mat file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...], optional): List of label keys. Defaults to ().
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.MatDataset(
        ...     "/path/to/file.mat"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...] = (),
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_mat_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = (
            {key: np.ones_like(next(iter(self.label.values()))) for key in self.label}
            if weight_dict is not None
            else {}
        )
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        weight_item = {key: value[idx] for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        return self._len

MeshAirfoilDataset

Bases: Dataset

Dataset for MeshAirfoil.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Name of input data.

required
label_keys Tuple[str, ...]

Name of label data.

required
data_dir str

Directory of MeshAirfoil data.

required
mesh_graph_path str

Path of mesh graph.

required
transpose_edges bool

Whether transpose the edges array from (2, num_edges) to (num_edges, 2) for convenient of slicing.

False

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.MeshAirfoilDataset(
...     "input_keys": ("input",),
...     "label_keys": ("output",),
...     "data_dir": "/path/to/MeshAirfoilDataset",
...     "mesh_graph_path": "/path/to/file.su2",
...     "transpose_edges": False,
... )
Source code in ppsci/data/dataset/airfoil_dataset.py
class MeshAirfoilDataset(io.Dataset):
    """Dataset for `MeshAirfoil`.

    Args:
        input_keys (Tuple[str, ...]): Name of input data.
        label_keys (Tuple[str, ...]): Name of label data.
        data_dir (str): Directory of MeshAirfoil data.
        mesh_graph_path (str): Path of mesh graph.
        transpose_edges (bool, optional): Whether transpose the edges array from (2, num_edges) to (num_edges, 2) for convenient of slicing.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.MeshAirfoilDataset(
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ...     "data_dir": "/path/to/MeshAirfoilDataset",
        ...     "mesh_graph_path": "/path/to/file.su2",
        ...     "transpose_edges": False,
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    use_pgl: bool = True

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        mesh_graph_path: str,
        transpose_edges: bool = False,
    ):
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.file_list = os.listdir(self.data_dir)
        self.len = len(self.file_list)
        self.mesh_graph = _get_mesh_graph(mesh_graph_path)

        with open(osp.join(osp.dirname(self.data_dir), "train_max_min.pkl"), "rb") as f:
            self.normalization_factors = pickle.load(f)

        self.nodes = self.mesh_graph[0]
        self.edges = self.mesh_graph[1]
        if transpose_edges:
            self.edges = self.edges.transpose([1, 0])
        self.elems_list = self.mesh_graph[2]
        self.marker_dict = self.mesh_graph[3]
        self.node_markers = np.full([self.nodes.shape[0], 1], fill_value=-1)
        for i, (marker_tag, marker_elems) in enumerate(self.marker_dict.items()):
            for elem in marker_elems:
                self.node_markers[elem[0]] = i
                self.node_markers[elem[1]] = i

        self.raw_graphs = [self.get(i) for i in range(len(self))]

    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        return (
            {
                self.input_keys[0]: self.raw_graphs[idx],
            },
            {
                self.label_keys[0]: self.raw_graphs[idx],
            },
            None,
        )

    def get(self, idx):
        with open(osp.join(self.data_dir, self.file_list[idx]), "rb") as f:
            fields = pickle.load(f)
        fields = self._preprocess(fields)
        aoa, reynolds, mach = self._get_params_from_name(self.file_list[idx])
        # aoa = aoa
        mach_or_reynolds = mach if reynolds is None else reynolds
        # mach_or_reynolds = mach_or_reynolds
        norm_aoa = aoa / 10
        norm_mach_or_reynolds = (
            mach_or_reynolds if reynolds is None else (mach_or_reynolds - 1.5e6) / 1.5e6
        )

        nodes = np.concatenate(
            [
                self.nodes,
                np.repeat(a=norm_aoa, repeats=self.nodes.shape[0])[:, np.newaxis],
                np.repeat(a=norm_mach_or_reynolds, repeats=self.nodes.shape[0])[
                    :, np.newaxis
                ],
                self.node_markers,
            ],
            axis=-1,
        ).astype(paddle.get_default_dtype())

        data = pgl.Graph(
            num_nodes=nodes.shape[0],
            edges=self.edges,
        )
        data.x = nodes
        data.y = fields
        data.pos = self.nodes
        data.edge_index = self.edges

        sender = data.x[data.edge_index[0]]
        receiver = data.x[data.edge_index[1]]
        relation_pos = sender[:, 0:2] - receiver[:, 0:2]
        post = np.linalg.norm(relation_pos, ord=2, axis=1, keepdims=True).astype(
            paddle.get_default_dtype()
        )
        data.edge_attr = post
        std_epsilon = [1e-8]
        a = np.mean(data.edge_attr, axis=0)
        b = data.edge_attr.std(axis=0)
        b = np.maximum(b, std_epsilon).astype(paddle.get_default_dtype())
        data.edge_attr = (data.edge_attr - a) / b
        data.aoa = aoa
        data.norm_aoa = norm_aoa
        data.mach_or_reynolds = mach_or_reynolds
        data.norm_mach_or_reynolds = norm_mach_or_reynolds
        return data

    def _preprocess(self, tensor_list, stack_output=True):
        data_max, data_min = self.normalization_factors
        normalized_tensors = []
        for i in range(len(tensor_list)):
            normalized = (tensor_list[i] - data_min[i]) / (
                data_max[i] - data_min[i]
            ) * 2 - 1
            normalized_tensors.append(normalized)
        if stack_output:
            normalized_tensors = np.stack(normalized_tensors, axis=1)
        return normalized_tensors

    def _get_params_from_name(self, filename):
        s = filename.rsplit(".", 1)[0].split("_")
        aoa = np.array(s[s.index("aoa") + 1])[np.newaxis].astype(
            paddle.get_default_dtype()
        )
        reynolds = s[s.index("re") + 1]
        reynolds = (
            np.array(reynolds)[np.newaxis].astype(paddle.get_default_dtype())
            if reynolds != "None"
            else None
        )
        mach = np.array(s[s.index("mach") + 1])[np.newaxis].astype(
            paddle.get_default_dtype()
        )
        return aoa, reynolds, mach

MeshCylinderDataset

Bases: Dataset

Dataset for MeshCylinder.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Name of input data.

required
label_keys Tuple[str, ...]

Name of label data.

required
data_dir str

Directory of MeshCylinder data.

required
mesh_graph_path str

Path of mesh graph.

required

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.MeshAirfoilDataset(
...     "input_keys": ("input",),
...     "label_keys": ("output",),
...     "data_dir": "/path/to/MeshAirfoilDataset",
...     "mesh_graph_path": "/path/to/file.su2",
... )
Source code in ppsci/data/dataset/cylinder_dataset.py
class MeshCylinderDataset(io.Dataset):
    """Dataset for `MeshCylinder`.

    Args:
        input_keys (Tuple[str, ...]): Name of input data.
        label_keys (Tuple[str, ...]): Name of label data.
        data_dir (str): Directory of MeshCylinder data.
        mesh_graph_path (str): Path of mesh graph.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.MeshAirfoilDataset(
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ...     "data_dir": "/path/to/MeshAirfoilDataset",
        ...     "mesh_graph_path": "/path/to/file.su2",
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    use_pgl: bool = True

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        mesh_graph_path: str,
    ):
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.file_list = os.listdir(self.data_dir)
        self.len = len(self.file_list)
        self.mesh_graph = airfoil_dataset._get_mesh_graph(mesh_graph_path)

        self.normalization_factors = np.array(
            [[978.6001, 48.9258, 24.8404], [-692.3159, -6.9950, -24.8572]],
            dtype=paddle.get_default_dtype(),
        )

        self.nodes = self.mesh_graph[0]
        self.meshnodes = self.mesh_graph[0]
        self.edges = self.mesh_graph[1]
        self.elems_list = self.mesh_graph[2]
        self.marker_dict = self.mesh_graph[3]
        self.bounder = []
        self.node_markers = np.full([self.nodes.shape[0], 1], fill_value=-1)
        for i, (marker_tag, marker_elems) in enumerate(self.marker_dict.items()):
            for elem in marker_elems:
                self.node_markers[elem[0]] = i
                self.node_markers[elem[1]] = i

        self.raw_graphs = [self.get(i) for i in range(len(self))]

    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        return (
            {
                self.input_keys[0]: self.raw_graphs[idx],
            },
            {
                self.label_keys[0]: self.raw_graphs[idx],
            },
            None,
        )

    def get(self, idx):
        with open(osp.join(self.data_dir, self.file_list[idx]), "r") as f:
            field = []
            pos = []
            for line in f.read().splitlines()[1:]:
                lines_pos = line.split(",")[1:3]
                lines_field = line.split(",")[3:]
                numbers_float = list(eval(i) for i in lines_pos)
                array = np.array(numbers_float, paddle.get_default_dtype())
                pos.append(array)
                numbers_float = list(eval(i) for i in lines_field)
                array = np.array(numbers_float, paddle.get_default_dtype())
                field.append(array)

        field = np.stack(field, axis=0)
        pos = np.stack(pos, axis=0)
        indexlist = []
        for i in range(self.meshnodes.shape[0]):
            b = self.meshnodes[i : (i + 1)]
            b = np.squeeze(b)
            index = np.nonzero(
                np.sum((pos == b), axis=1, dtype=paddle.get_default_dtype())
                == pos.shape[1]
            )
            indexlist.append(index)
        indexlist = np.stack(indexlist, axis=0)
        indexlist = np.squeeze(indexlist)
        fields = field[indexlist]
        velocity = self._get_params_from_name(self.file_list[idx])

        norm_aoa = velocity / 40
        # add physics parameters to graph
        nodes = np.concatenate(
            [
                self.nodes,
                np.repeat(a=norm_aoa, repeats=self.nodes.shape[0])[:, np.newaxis],
                self.node_markers,
            ],
            axis=-1,
        ).astype(paddle.get_default_dtype())

        data = pgl.Graph(
            num_nodes=nodes.shape[0],
            edges=self.edges,
        )
        data.x = nodes
        data.y = fields
        data.pos = self.nodes
        data.edge_index = self.edges
        data.velocity = velocity

        sender = data.x[data.edge_index[0]]
        receiver = data.x[data.edge_index[1]]
        relation_pos = sender[:, 0:2] - receiver[:, 0:2]
        post = np.linalg.norm(relation_pos, ord=2, axis=1, keepdims=True).astype(
            paddle.get_default_dtype()
        )
        data.edge_attr = post
        std_epsilon = [1e-8]
        a = np.mean(data.edge_attr, axis=0)
        b = data.edge_attr.std(axis=0)
        b = np.maximum(b, std_epsilon).astype(paddle.get_default_dtype())
        data.edge_attr = (data.edge_attr - a) / b
        a = np.mean(data.y, axis=0)
        b = data.y.std(axis=0)
        b = np.maximum(b, std_epsilon).astype(paddle.get_default_dtype())
        data.y = (data.y - a) / b
        data.norm_max = a
        data.norm_min = b

        # find the face of the boundary,our cylinder dataset come from fluent solver
        with open(osp.join(osp.dirname(self.data_dir), "bounder"), "r") as f:
            field = []
            pos = []
            for line in f.read().splitlines()[1:]:
                lines_pos = line.split(",")[1:3]
                lines_field = line.split(",")[3:]
                numbers_float = list(eval(i) for i in lines_pos)
                array = np.array(numbers_float, paddle.get_default_dtype())
                pos.append(array)
                numbers_float = list(eval(i) for i in lines_field)
                array = np.array(numbers_float, paddle.get_default_dtype())
                field.append(array)

        field = np.stack(field, axis=0)
        pos = np.stack(pos, axis=0)

        indexlist = []
        for i in range(pos.shape[0]):
            b = pos[i : (i + 1)]
            b = np.squeeze(b)
            index = np.nonzero(
                np.sum((self.nodes == b), axis=1, dtype=paddle.get_default_dtype())
                == self.nodes.shape[1]
            )
            indexlist.append(index)

        indexlist = np.stack(indexlist, axis=0)
        indexlist = np.squeeze(indexlist)
        self.bounder = indexlist
        return data

    def _get_params_from_name(self, filename):
        s = filename.rsplit(".", 1)[0]
        reynolds = np.array(s[13:])[np.newaxis].astype(paddle.get_default_dtype())
        return reynolds

MoleculeDatasetIter

Bases: IterableDataset

Source code in ppsci/data/dataset/synthemol_dataset.py
class MoleculeDatasetIter(io.IterableDataset):
    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        args,
        smiles: list[str],
        fingerprints: np.ndarray = None,
        properties: list[int] = None,
        shuffle: bool = False,
        num_workers: int = 0,
    ):
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.args = args

        self.data_loader = chemprop_build_data_loader(
            smiles, fingerprints, properties, shuffle, num_workers
        )

    def __iter__(self):
        for batch in self.data_loader:
            (
                mol_batch,
                features_batch,
                target_batch,
                mask_batch,
                atom_descriptors_batch,
                atom_features_batch,
                bond_features_batch,
                data_weights_batch,
                lt_target_batch,
                gt_target_batch,
            ) = batch

            mask = paddle.to_tensor(data=mask_batch, dtype="float32")
            targets = paddle.to_tensor(
                data=[[(0 if x is None else x) for x in tb] for tb in target_batch]
            )
            if self.args.target_weights is not None:
                target_weights = paddle.to_tensor(
                    data=self.args.target_weights
                ).unsqueeze(axis=0)
            else:
                target_weights = paddle.ones(shape=tuple(targets.shape)[1]).unsqueeze(
                    axis=0
                )
            data_weights = paddle.to_tensor(data=data_weights_batch).unsqueeze(axis=1)
            if self.args.loss_function == "bounded_mse":
                lt_target_batch = paddle.to_tensor(data=lt_target_batch)
                gt_target_batch = paddle.to_tensor(data=gt_target_batch)

            yield (
                {
                    self.input_keys[0]: mol_batch,
                    self.input_keys[1]: features_batch,
                    self.input_keys[2]: atom_descriptors_batch,
                    self.input_keys[3]: atom_features_batch,
                    self.input_keys[4]: bond_features_batch,
                },
                {
                    self.label_keys[0]: targets,
                    self.label_keys[1]: data_weights,
                    self.label_keys[2]: mask,
                    self.label_keys[3]: target_weights,
                },
                {},
            )

MOlFLOWDataset

Bases: Dataset

Class for moflow qm9 and zinc250k Dataset of a tuple of datasets.

It combines multiple datasets into one dataset. Each example is represented by a tuple whose i-th item corresponds to the i-th dataset. And each i-th dataset is expected to be an instance of numpy.ndarray.

Args: file_path (str): Data set path. data_name (str): Data name, "qm9" or "zinc250k" valid_idx (List[int]): Data for validate mode (str): "train" or "eval", output Data input_keys (Tuple[str, ...]): Input keys, such as ("nodes","edges",). label_keys (Tuple[str, ...]): labels (str or list or None) . smiles_col (str): smiles column weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of each constraint variable. Defaults to None. transform_fn: An optional function applied to an item bofre returning

Source code in ppsci/data/dataset/moflow_dataset.py
class MOlFLOWDataset(io.Dataset):
    """Class for moflow qm9 and zinc250k Dataset of a tuple of datasets.

    It combines multiple datasets into one dataset. Each example is represented
    by a tuple whose ``i``-th item corresponds to the i-th dataset.
    And each ``i``-th dataset is expected to be an instance of numpy.ndarray.

    Args:
    file_path (str): Data set path.
    data_name (str): Data name,  "qm9" or "zinc250k"
    valid_idx (List[int]): Data for validate
    mode (str): "train" or "eval", output Data
    input_keys (Tuple[str, ...]): Input keys, such as ("nodes","edges",).
    label_keys (Tuple[str, ...]): labels (str or list or None) .
    smiles_col (str): smiles column
    weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of each constraint variable. Defaults to None.
    transform_fn: An optional function applied to an item bofre returning
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        file_path: str,
        data_name: str,
        valid_idx: List[int],
        mode: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        smiles_col: str,
        weight_dict: Optional[Dict[str, float]] = None,
        transform_fn: Optional[Callable] = None,
    ):
        super().__init__()
        self.file_path = file_path
        self.data_name = data_name
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.smiles_col = smiles_col
        self.weight_dict = weight_dict

        if data_name == "qm9":
            max_atoms = 9
        elif data_name == "zinc250k":
            max_atoms = 38

        self.molgraph = MolGraph(out_size=max_atoms, kekulize=True)
        self.logger = logger
        # read and deal data from file
        inputs, labels = self.load_csv_file(file_path, data_name + ".csv")
        train_idx = [t for t in range(len(inputs[0])) if t not in valid_idx]
        self.train_idx = train_idx
        #  data train or test
        if mode == "train":
            inputs = [
                np.array(list(io.Subset(dataset=in_put, indices=train_idx)))
                for in_put in inputs
            ]
            labels = np.array(list(io.Subset(dataset=labels, indices=train_idx)))
        elif mode == "eval":
            inputs = [
                np.array(list(io.Subset(dataset=in_put, indices=valid_idx)))
                for in_put in inputs
            ]
            labels = np.array(list(io.Subset(dataset=labels, indices=valid_idx)))

        # fetch input data
        self.input = {key: inputs[i] for i, key in enumerate(self.input_keys)}
        # fetch label data
        self.label = {"label": labels}

        self.logger.message(
            f"Dataload finished. MODE {mode}, "
            f"inputs {len(next(iter(self.input.values())))}, "
            f"labelS {len(next(iter(self.label.values())))}"
        )

        self._length = len(next(iter(self.input.values())))
        self.transform = transform_fn

    def __getitem__(self, index: int):
        input_item = {key: value[index] for key, value in self.input.items()}
        label_item = {key: value[index] for key, value in self.label.items()}

        if self.transform:
            input_item, label_item = self.transform_func(input_item, label_item)

        return (input_item, label_item, {})

    def __len__(self):
        return self._length

    def load_csv_file(self, path: str, name: str):
        """Parse DataFrame using `MolGraph` and prepare a dataset instance
        Labels are extracted from `labels` columns and input features are
        extracted from smiles information in `smiles` column.
        """
        file = os.path.join(path, name)
        df = pd.read_csv(file, index_col=0)
        all_nodes = []
        all_edges = []
        # inputs = []

        total_count = df.shape[0]
        fail_count = 0
        success_count = 0
        if isinstance(self.molgraph, MolGraph):
            for smiles in tqdm(df[self.smiles_col], total=df.shape[0]):
                try:
                    mol = Chem.MolFromSmiles(smiles)
                    if mol is None:
                        fail_count += 1
                        continue
                    canonical_smiles, mol = self.molgraph.prepare_smiles_and_mol(mol)
                    nodes, edges = self.molgraph.get_input_features(mol)

                except MolGraphError as e:
                    fail_count += 1
                    self.logger.warning(f"parse(), type: {type(e).__name__}, {e.args}")
                    continue
                except Exception as e:
                    self.logger.warning(f"parse(), type: {type(e).__name__}, {e.args}")
                    fail_count += 1
                    continue
                # raw_data = misc.convert_to_dict(np.array([nodes, edges]), self.input_keys)

                all_nodes.append(nodes)
                all_edges.append(edges)
                # inputs.append(raw_data)

                success_count += 1

            labels = np.array(
                [*(df[label_col].values for label_col in self.label_keys)]
            ).T
            result = [np.array(all_nodes), np.array(all_edges)], labels
            self.logger.message(
                f"Preprocess finished. FAIL {fail_count}, "
                f"SUCCESS {success_count}, TOTAL {total_count}"
            )
        else:
            raise NotImplementedError

        return result

    def transform_func(self, data_dict, label_dict):
        items = []
        length = len(next(iter(data_dict.values())))
        for idx in range(length):
            input_item = [value[idx] for key, value in data_dict.items()]
            label_item = [value[idx] for key, value in label_dict.items()]
            item = input_item + label_item
            if self.transform:
                item = self.transform(item)
            items.append(item)
        items = np.array(items, dtype=object).T

        data_dict = {key: np.stack(items[i], axis=0) for i, key in enumerate(data_dict)}
        label_dict = {key: np.vstack(item[2]) for key in label_dict}

        return data_dict, label_dict

load_csv_file(path, name)

Parse DataFrame using MolGraph and prepare a dataset instance Labels are extracted from labels columns and input features are extracted from smiles information in smiles column.

Source code in ppsci/data/dataset/moflow_dataset.py
def load_csv_file(self, path: str, name: str):
    """Parse DataFrame using `MolGraph` and prepare a dataset instance
    Labels are extracted from `labels` columns and input features are
    extracted from smiles information in `smiles` column.
    """
    file = os.path.join(path, name)
    df = pd.read_csv(file, index_col=0)
    all_nodes = []
    all_edges = []
    # inputs = []

    total_count = df.shape[0]
    fail_count = 0
    success_count = 0
    if isinstance(self.molgraph, MolGraph):
        for smiles in tqdm(df[self.smiles_col], total=df.shape[0]):
            try:
                mol = Chem.MolFromSmiles(smiles)
                if mol is None:
                    fail_count += 1
                    continue
                canonical_smiles, mol = self.molgraph.prepare_smiles_and_mol(mol)
                nodes, edges = self.molgraph.get_input_features(mol)

            except MolGraphError as e:
                fail_count += 1
                self.logger.warning(f"parse(), type: {type(e).__name__}, {e.args}")
                continue
            except Exception as e:
                self.logger.warning(f"parse(), type: {type(e).__name__}, {e.args}")
                fail_count += 1
                continue
            # raw_data = misc.convert_to_dict(np.array([nodes, edges]), self.input_keys)

            all_nodes.append(nodes)
            all_edges.append(edges)
            # inputs.append(raw_data)

            success_count += 1

        labels = np.array(
            [*(df[label_col].values for label_col in self.label_keys)]
        ).T
        result = [np.array(all_nodes), np.array(all_edges)], labels
        self.logger.message(
            f"Preprocess finished. FAIL {fail_count}, "
            f"SUCCESS {success_count}, TOTAL {total_count}"
        )
    else:
        raise NotImplementedError

    return result

NamedArrayDataset

Bases: Dataset

Class for Named Array Dataset.

Parameters:

Name Type Description Default
input Dict[str, ndarray]

Input dict.

required
label Optional[Dict[str, ndarray]]

Label dict. Defaults to None.

None
weight Optional[Dict[str, ndarray]]

Weight dict. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> input = {"x": np.random.randn(100, 1)}
>>> output = {"u": np.random.randn(100, 1)}
>>> weight = {"u": np.random.randn(100, 1)}
>>> dataset = ppsci.data.dataset.NamedArrayDataset(input, output, weight)
Source code in ppsci/data/dataset/array_dataset.py
class NamedArrayDataset(io.Dataset):
    """Class for Named Array Dataset.

    Args:
        input (Dict[str, np.ndarray]): Input dict.
        label (Optional[Dict[str, np.ndarray]]): Label dict. Defaults to None.
        weight (Optional[Dict[str, np.ndarray]]): Weight dict. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> input = {"x": np.random.randn(100, 1)}
        >>> output = {"u": np.random.randn(100, 1)}
        >>> weight = {"u": np.random.randn(100, 1)}
        >>> dataset = ppsci.data.dataset.NamedArrayDataset(input, output, weight)
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        input: Dict[str, np.ndarray],
        label: Optional[Dict[str, np.ndarray]] = None,
        weight: Optional[Dict[str, np.ndarray]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input = input
        self.label = {} if label is None else label
        self.input_keys = tuple(input.keys())
        self.label_keys = tuple(self.label.keys())
        self.weight = {} if weight is None else weight
        self.transforms = transforms
        self._len = len(next(iter(input.values())))
        for key in input:
            if key in self.label and len(input[key]) != len(self.label[key]):
                logger.warning(
                    f"The length of input {key}({len(input[key])}) is not equal to "
                    f"the length of label {key}({len(self.label[key])})."
                )

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        weight_item = {key: value[idx] for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        return self._len

NPZDataset

Bases: Dataset

Dataset class for .npz file.

Parameters:

Name Type Description Default
file_path str

Npz file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys. Defaults to ().

()
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.NPZDataset(
...     "/path/to/file.npz"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/npz_dataset.py
class NPZDataset(io.Dataset):
    """Dataset class for .npz file.

    Args:
        file_path (str): Npz file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...], optional): List of label keys. Defaults to ().
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.NPZDataset(
        ...     "/path/to/file.npz"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...] = (),
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_npz_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = {}
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        weight_item = {key: value[idx] for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        return self._len

PEMSDataset

Bases: Dataset

Dataset class for PEMSD4 and PEMSD8 dataset.

Parameters:

Name Type Description Default
file_path str

Dataset root path.

required
split str

Dataset split label.

required
input_keys Tuple[str, ...]

A tuple of input keys.

required
label_keys Tuple[str, ...]

A tuple of label keys.

required
weight_dict Optional[Dict[str, float]]

Define the weight of each constraint variable. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None
norm_input bool

Whether to normalize the input. Defaults to True.

True
norm_label bool

Whether to normalize the output. Defaults to False.

False
input_len int

The input timesteps. Defaults to 12.

12
label_len int

The output timesteps. Defaults to 12.

12

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.PEMSDataset(
...     "./Data/PEMSD4",
...     "train",
...     ("input",),
...     ("label",),
... )
Source code in ppsci/data/dataset/pems_dataset.py
class PEMSDataset(Dataset):
    """Dataset class for PEMSD4 and PEMSD8 dataset.

    Args:
        file_path (str): Dataset root path.
        split (str): Dataset split label.
        input_keys (Tuple[str, ...]): A tuple of input keys.
        label_keys (Tuple[str, ...]): A tuple of label keys.
        weight_dict (Optional[Dict[str, float]]): Define the weight of each constraint variable. Defaults to None.
        transforms (Optional[Compose]): Compose object contains sample wise transform(s). Defaults to None.
        norm_input (bool): Whether to normalize the input. Defaults to True.
        norm_label (bool): Whether to normalize the output. Defaults to False.
        input_len (int): The input timesteps. Defaults to 12.
        label_len (int): The output timesteps. Defaults to 12.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.PEMSDataset(
        ...     "./Data/PEMSD4",
        ...     "train",
        ...     ("input",),
        ...     ("label",),
        ... )  # doctest: +SKIP
    """

    def __init__(
        self,
        file_path: str,
        split: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        weight_dict: Optional[Dict[str, float]] = None,
        transforms: Optional[Compose] = None,
        norm_input: bool = True,
        norm_label: bool = False,
        input_len: int = 12,
        label_len: int = 12,
    ):
        super().__init__()

        self.input_keys = input_keys
        self.label_keys = label_keys
        self.weight_dict = weight_dict

        self.transforms = transforms
        self.norm_input = norm_input
        self.norm_label = norm_label

        data = np.load(os.path.join(file_path, f"{split}.npy")).astype(np.float32)

        self.mean = np.load(os.path.join(file_path, "mean.npy")).astype(np.float32)
        self.std = np.load(os.path.join(file_path, "std.npy")).astype(np.float32)
        self.scaler = StandardScaler(self.mean, self.std)

        X, Y = add_window_horizon(data, input_len, label_len)
        if norm_input:
            X = self.scaler.transform(X)
        if norm_label:
            Y = self.scaler.transform(Y)

        self._len = X.shape[0]

        self.input = {input_keys[0]: X}
        self.label = {label_keys[0]: Y}

        if weight_dict is not None:
            self.weight_dict = {key: np.array(1.0) for key in self.label_keys}
            self.weight_dict.update(weight_dict)
        else:
            self.weight = {}

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        weight_item = {key: value[idx] for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        return self._len

RadarDataset

Bases: Dataset

Class for Radar dataset.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
image_width int

Image width.

required
image_height int

Image height.

required
total_length int

Total length.

required
dataset_path str

Dataset path.

required
data_type str

Input and output data type. Defaults to paddle.get_default_dtype().

get_default_dtype()
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.RadarDataset(
...     "input_keys": ("input",),
...     "label_keys": ("output",),
...     "image_width": 512,
...     "image_height": 512,
...     "total_length": 29,
...     "dataset_path": "datasets/mrms/figure",
...     "data_type": paddle.get_default_dtype(),
... )
Source code in ppsci/data/dataset/radar_dataset.py
class RadarDataset(io.Dataset):
    """Class for Radar dataset.

    Args:
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        image_width (int): Image width.
        image_height (int): Image height.
        total_length (int): Total length.
        dataset_path (str): Dataset path.
        data_type (str): Input and output data type. Defaults to paddle.get_default_dtype().
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.RadarDataset(
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ...     "image_width": 512,
        ...     "image_height": 512,
        ...     "total_length": 29,
        ...     "dataset_path": "datasets/mrms/figure",
        ...     "data_type": paddle.get_default_dtype(),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        image_width: int,
        image_height: int,
        total_length: int,
        dataset_path: str,
        data_type: str = paddle.get_default_dtype(),
        weight_dict: Optional[Dict[str, float]] = None,
    ):
        super().__init__()
        if importlib.util.find_spec("cv2") is None:
            raise ModuleNotFoundError(
                "To use RadarDataset, please install 'opencv-python' with: `pip install "
                "opencv-python` first."
            )
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.img_width = image_width
        self.img_height = image_height
        self.length = total_length
        self.dataset_path = dataset_path
        self.data_type = data_type

        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.case_list = []
        name_list = os.listdir(self.dataset_path)
        name_list.sort()
        for name in name_list:
            case = []
            for i in range(29):
                case.append(
                    self.dataset_path
                    + "/"
                    + name
                    + "/"
                    + name
                    + "-"
                    + str(i).zfill(2)
                    + ".png"
                )
            self.case_list.append(case)

    def _load(self, index):
        data = []
        for img_path in self.case_list[index]:
            img = cv2.imread(img_path, 2)
            data.append(np.expand_dims(img, axis=0))
        data = np.concatenate(data, axis=0).astype(self.data_type) / 10.0 - 3.0
        assert data.shape[1] <= 1024 and data.shape[2] <= 1024
        return data

    def __getitem__(self, index):
        data = self._load(index)[-self.length :].copy()
        mask = np.ones_like(data)
        mask[data < 0] = 0
        data[data < 0] = 0
        data = np.clip(data, 0, 128)
        vid = np.zeros(
            (self.length, self.img_height, self.img_width, 2), dtype=self.data_type
        )
        vid[..., 0] = data
        vid[..., 1] = mask

        input_item = {self.input_keys[0]: vid}
        label_item = {}
        weight_item = {}
        for key in self.label_keys:
            label_item[key] = np.asarray([], paddle.get_default_dtype())
        if len(label_item) > 0:
            weight_shape = [1] * len(next(iter(label_item.values())).shape)
            weight_item = {
                key: np.full(weight_shape, value, paddle.get_default_dtype())
                for key, value in self.weight_dict.items()
            }
        return input_item, label_item, weight_item

    def __len__(self):
        return len(self.case_list)

RosslerDataset

Bases: LorenzDataset

Dataset for training Rossler model.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("states",).

required
label_keys Tuple[str, ...]

Output keys, such as ("pred_states", "recover_states").

required
block_size int

Data block size.

required
stride int

Data stride.

required
ndata Optional[int]

Number of data series to use. Defaults to None.

None
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
embedding_model Optional[Arch]

Embedding model. Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.RosslerDataset(
...     "file_path": "/path/to/RosslerDataset",
...     "input_keys": ("x",),
...     "label_keys": ("v",),
...     "block_size": 32,
...     "stride": 16,
... )
Source code in ppsci/data/dataset/trphysx_dataset.py
class RosslerDataset(LorenzDataset):
    """Dataset for training Rossler model.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("states",).
        label_keys (Tuple[str, ...]): Output keys, such as ("pred_states", "recover_states").
        block_size (int): Data block size.
        stride (int): Data stride.
        ndata (Optional[int]): Number of data series to use. Defaults to None.
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        embedding_model (Optional[base.Arch]): Embedding model. Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.RosslerDataset(
        ...     "file_path": "/path/to/RosslerDataset",
        ...     "input_keys": ("x",),
        ...     "label_keys": ("v",),
        ...     "block_size": 32,
        ...     "stride": 16,
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        block_size: int,
        stride: int,
        ndata: Optional[int] = None,
        weight_dict: Optional[Dict[str, float]] = None,
        embedding_model: Optional[base.Arch] = None,
    ):
        if not os.path.exists(file_path):
            raise FileNotFoundError(
                f"file_path({file_path}) not exists. Please download dataset first. "
                "Training: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/rossler_training.hdf5. "
                "Valid: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/rossler_valid.hdf5."
            )
        super().__init__(
            file_path,
            input_keys,
            label_keys,
            block_size,
            stride,
            ndata,
            weight_dict,
            embedding_model,
        )

SEVIRDataset

Bases: Dataset

The Storm EVent ImagRy dataset.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Name of input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Name of label keys, such as ("output",).

required
data_dir str

The path of the dataset.

required
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
data_types Sequence[str]

A subset of SEVIR_DATA_TYPES. Defaults to [ "vil", ].

['vil']
seq_len int

The length of the data sequences. Should be smaller than the max length raw_seq_len. Defaults to 49.

49
raw_seq_len int

The length of the raw data sequences. Defaults to 49.

49
sample_mode str

The mode of sampling, eg.'random' or 'sequent'. Defaults to "sequent".

'sequent'
stride int

Useful when sample_mode == 'sequent' stride must not be smaller than out_len to prevent data leakage in testing. Defaults to 12.

12
batch_size int

The batch size. Defaults to 1.

1
layout str

Consists of batch_size 'N', seq_len 'T', channel 'C', height 'H', width 'W' The layout of sampled data. Raw data layout is 'NHWT'. valid layout: 'NHWT', 'NTHW', 'NTCHW', 'TNHW', 'TNCHW'. Defaults to "NHWT".

'NHWT'
in_len int

The length of input data. Defaults to 13.

13
out_len int

The length of output data. Defaults to 12.

12
num_shard int

Split the whole dataset into num_shard parts for distributed training. Defaults to 1.

1
rank int

Rank of the current process within num_shard. Defaults to 0.

0
split_mode str

If 'ceil', all num_shard dataloaders have the same length = ceil(total_len / num_shard). Different dataloaders may have some duplicated data batches, if the total size of datasets is not divided by num_shard. if 'floor', all num_shard dataloaders have the same length = floor(total_len / num_shard). The last several data batches may be wasted, if the total size of datasets is not divided by num_shard. if 'uneven', the last datasets has larger length when the total length is not divided by num_shard. The uneven split leads to synchronization error in dist.all_reduce() or dist.barrier(). See related issue: https://github.com/pytorch/pytorch/issues/33148 Notice: this also affects the behavior of self.use_up. Defaults to "uneven".

'uneven'
start_date datetime

Start time of SEVIR samples to generate. Defaults to None.

None
end_date datetime

End time of SEVIR samples to generate. Defaults to None.

None
datetime_filter function

Mask function applied to time_utc column of catalog (return true to keep the row). Pass function of the form lambda t : COND(t) Example: lambda t: np.logical_and(t.dt.hour>=13,t.dt.hour<=21) # Generate only day-time events. Defaults to None.

None
catalog_filter function

Function or None or 'default' Mask function applied to entire catalog dataframe (return true to keep row). Pass function of the form lambda catalog: COND(catalog) Example: lambda c: [s[0]=='S' for s in c.id] # Generate only the 'S' events

'default'
shuffle bool

If True, data samples are shuffled before each epoch. Defaults to False.

False
shuffle_seed int

Seed to use for shuffling. Defaults to 1.

1
output_type dtype

The type of generated tensors. Defaults to np.float32.

float32
preprocess bool

If True, self.preprocess_data_dict(data_dict) is called before each sample generated. Defaults to True.

True
rescale_method str

The method of rescale. Defaults to "01".

'01'
downsample_dict Dict[str, Sequence[int]]

Downsample_dict.keys() == data_types. downsample_dict[key] is a Sequence of (t_factor, h_factor, w_factor),representing the downsampling factors of all dimensions. Defaults to None.

None
verbose bool

Verbose when opening raw data files. Defaults to False.

False
training str

Training pathse. Defaults to "train".

'train'
Source code in ppsci/data/dataset/sevir_dataset.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
class SEVIRDataset(io.Dataset):
    """The Storm EVent ImagRy dataset.

    Args:
        input_keys (Tuple[str, ...]): Name of input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Name of label keys, such as ("output",).
        data_dir (str): The path of the dataset.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of each constraint variable. Defaults to None.
        data_types (Sequence[str], optional): A subset of SEVIR_DATA_TYPES. Defaults to [ "vil", ].
        seq_len (int, optional): The length of the data sequences. Should be smaller than the max length raw_seq_len. Defaults to 49.
        raw_seq_len (int, optional): The length of the raw data sequences. Defaults to 49.
        sample_mode (str, optional): The mode of sampling, eg.'random' or 'sequent'. Defaults to "sequent".
        stride (int, optional): Useful when sample_mode == 'sequent'
            stride must not be smaller than out_len to prevent data leakage in testing. Defaults to 12.
        batch_size (int, optional): The batch size. Defaults to 1.
        layout (str, optional): Consists of batch_size 'N', seq_len 'T', channel 'C', height 'H', width 'W'
            The layout of sampled data. Raw data layout is 'NHWT'.
            valid layout: 'NHWT', 'NTHW', 'NTCHW', 'TNHW', 'TNCHW'. Defaults to "NHWT".
        in_len (int, optional): The length of input data. Defaults to 13.
        out_len (int, optional): The length of output data. Defaults to 12.
        num_shard (int, optional): Split the whole dataset into num_shard parts for distributed training. Defaults to 1.
        rank (int, optional): Rank of the current process within num_shard. Defaults to 0.
        split_mode (str, optional): If 'ceil', all `num_shard` dataloaders have the same length = ceil(total_len / num_shard).
            Different dataloaders may have some duplicated data batches, if the total size of datasets is not divided by num_shard.
            if 'floor', all `num_shard` dataloaders have the same length = floor(total_len / num_shard).
            The last several data batches may be wasted, if the total size of datasets is not divided by num_shard.
            if 'uneven', the last datasets has larger length when the total length is not divided by num_shard.
            The uneven split leads to synchronization error in dist.all_reduce() or dist.barrier().
            See related issue: https://github.com/pytorch/pytorch/issues/33148
            Notice: this also affects the behavior of `self.use_up`. Defaults to "uneven".
        start_date (datetime.datetime, optional): Start time of SEVIR samples to generate. Defaults to None.
        end_date (datetime.datetime, optional): End time of SEVIR samples to generate. Defaults to None.
        datetime_filter (function, optional): Mask function applied to time_utc column of catalog (return true to keep the row).
            Pass function of the form   lambda t : COND(t)
            Example:  lambda t: np.logical_and(t.dt.hour>=13,t.dt.hour<=21)  # Generate only day-time events. Defaults to None.
        catalog_filter (function, optional): Function or None or 'default'
            Mask function applied to entire catalog dataframe (return true to keep row).
            Pass function of the form lambda catalog:  COND(catalog)
            Example:  lambda c:  [s[0]=='S' for s in c.id]   # Generate only the 'S' events
        shuffle (bool, optional): If True, data samples are shuffled before each epoch. Defaults to False.
        shuffle_seed (int, optional): Seed to use for shuffling. Defaults to 1.
        output_type (np.dtype, optional): The type of generated tensors. Defaults to np.float32.
        preprocess (bool, optional): If True, self.preprocess_data_dict(data_dict) is called before each sample generated. Defaults to True.
        rescale_method (str, optional): The method of rescale. Defaults to "01".
        downsample_dict (Dict[str, Sequence[int]], optional): Downsample_dict.keys() == data_types.
            downsample_dict[key] is a Sequence of (t_factor, h_factor, w_factor),representing the downsampling factors of all dimensions. Defaults to None.
        verbose (bool, optional): Verbose when opening raw data files. Defaults to False.
        training (str, optional): Training pathse. Defaults to "train".
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        weight_dict: Optional[Dict[str, float]] = None,
        data_types: Sequence[str] = [
            "vil",
        ],
        seq_len: int = 49,
        raw_seq_len: int = 49,
        sample_mode: str = "sequent",
        stride: int = 12,
        batch_size: int = 1,
        layout: str = "NHWT",
        in_len: int = 13,
        out_len: int = 12,
        num_shard: int = 1,
        rank: int = 0,
        split_mode: str = "uneven",
        start_date: datetime.datetime = None,
        end_date: datetime.datetime = None,
        datetime_filter=None,
        catalog_filter="default",
        shuffle: bool = False,
        shuffle_seed: int = 1,
        output_type=np.float32,
        preprocess: bool = True,
        rescale_method: str = "01",
        downsample_dict: Dict[str, Sequence[int]] = None,
        verbose: bool = False,
        training="train",
    ):
        super(SEVIRDataset, self).__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        # sevir
        SEVIR_ROOT_DIR = os.path.join(self.data_dir, "sevir")
        sevir_catalog = os.path.join(SEVIR_ROOT_DIR, "CATALOG.csv")
        sevir_data_dir = os.path.join(SEVIR_ROOT_DIR, "data")
        # sevir-lr
        # SEVIR_ROOT_DIR = os.path.join(self.data_dir, "sevir_lr")
        # SEVIR_CATALOG = os.path.join(SEVIR_ROOT_DIR, "CATALOG.csv")
        # SEVIR_DATA_DIR = os.path.join(SEVIR_ROOT_DIR, "data")

        if data_types is None:
            data_types = SEVIR_DATA_TYPES
        else:
            assert set(data_types).issubset(SEVIR_DATA_TYPES)

        # configs which should not be modified
        self._dtypes = SEVIR_RAW_DTYPES
        self.lght_frame_times = LIGHTING_FRAME_TIMES
        self.data_shape = SEVIR_DATA_SHAPE

        self.raw_seq_len = raw_seq_len
        self.seq_len = seq_len

        if seq_len > raw_seq_len:
            raise ValueError("seq_len must be small than raw_seq_len")

        if sample_mode not in ["random", "sequent"]:
            raise ValueError("sample_mode must be 'random' or 'sequent'.")

        self.sample_mode = sample_mode
        self.stride = stride
        self.batch_size = batch_size
        valid_layout = ("NHWT", "NTHW", "NTCHW", "NTHWC", "TNHW", "TNCHW")
        if layout not in valid_layout:
            raise ValueError(
                f"Invalid layout = {layout}! Must be one of {valid_layout}."
            )
        self.layout = layout
        self.in_len = in_len
        self.out_len = out_len

        self.num_shard = num_shard
        self.rank = rank
        valid_split_mode = ("ceil", "floor", "uneven")
        if split_mode not in valid_split_mode:
            raise ValueError(
                f"Invalid split_mode: {split_mode}! Must be one of {valid_split_mode}."
            )
        self.split_mode = split_mode
        self._samples = None
        self._hdf_files = {}
        self.data_types = data_types
        if isinstance(sevir_catalog, str):
            self.catalog = pd.read_csv(
                sevir_catalog, parse_dates=["time_utc"], low_memory=False
            )
        else:
            self.catalog = sevir_catalog
        self.sevir_data_dir = sevir_data_dir
        self.datetime_filter = datetime_filter
        self.catalog_filter = catalog_filter
        self.start_date = start_date
        self.end_date = end_date
        # train val test split
        self.start_date = (
            datetime.datetime(*start_date) if start_date is not None else None
        )
        self.end_date = datetime.datetime(*end_date) if end_date is not None else None

        self.shuffle = shuffle
        self.shuffle_seed = int(shuffle_seed)
        self.output_type = output_type
        self.preprocess = preprocess
        self.downsample_dict = downsample_dict
        self.rescale_method = rescale_method
        self.verbose = verbose

        if self.start_date is not None:
            self.catalog = self.catalog[self.catalog.time_utc > self.start_date]
        if self.end_date is not None:
            self.catalog = self.catalog[self.catalog.time_utc <= self.end_date]
        if self.datetime_filter:
            self.catalog = self.catalog[self.datetime_filter(self.catalog.time_utc)]

        if self.catalog_filter is not None:
            if self.catalog_filter == "default":
                self.catalog_filter = lambda c: c.pct_missing == 0
            self.catalog = self.catalog[self.catalog_filter(self.catalog)]

        self._compute_samples()
        self._open_files(verbose=self.verbose)

    def _compute_samples(self):
        """
        Computes the list of samples in catalog to be used. This sets self._samples
        """
        # locate all events containing colocated data_types
        imgt = self.data_types
        imgts = set(imgt)
        filtcat = self.catalog[
            np.logical_or.reduce([self.catalog.img_type == i for i in imgt])
        ]
        # remove rows missing one or more requested img_types
        filtcat = filtcat.groupby("id").filter(
            lambda x: imgts.issubset(set(x["img_type"]))
        )
        # If there are repeated IDs, remove them (this is a bug in SEVIR)
        # TODO: is it necessary to keep one of them instead of deleting them all
        filtcat = filtcat.groupby("id").filter(lambda x: x.shape[0] == len(imgt))
        self._samples = filtcat.groupby("id").apply(
            lambda df: self._df_to_series(df, imgt)
        )
        if self.shuffle:
            self.shuffle_samples()

    def shuffle_samples(self):
        self._samples = self._samples.sample(frac=1, random_state=self.shuffle_seed)

    def _df_to_series(self, df, imgt):
        d = {}
        df = df.set_index("img_type")
        for i in imgt:
            s = df.loc[i]
            idx = s.file_index if i != "lght" else s.id
            d.update({f"{i}_filename": [s.file_name], f"{i}_index": [idx]})

        return pd.DataFrame(d)

    def _open_files(self, verbose=True):
        """
        Opens HDF files
        """
        imgt = self.data_types
        hdf_filenames = []
        for t in imgt:
            hdf_filenames += list(np.unique(self._samples[f"{t}_filename"].values))
        self._hdf_files = {}
        for f in hdf_filenames:
            if verbose:
                print("Opening HDF5 file for reading", f)
            self._hdf_files[f] = h5py.File(self.sevir_data_dir + "/" + f, "r")

    def close(self):
        """
        Closes all open file handles
        """
        for f in self._hdf_files:
            self._hdf_files[f].close()
        self._hdf_files = {}

    @property
    def num_seq_per_event(self):
        return 1 + (self.raw_seq_len - self.seq_len) // self.stride

    @property
    def total_num_seq(self):
        """
        The total number of sequences within each shard.
        Notice that it is not the product of `self.num_seq_per_event` and `self.total_num_event`.
        """
        return int(self.num_seq_per_event * self.num_event)

    @property
    def total_num_event(self):
        """
        The total number of events in the whole dataset, before split into different shards.
        """
        return int(self._samples.shape[0])

    @property
    def start_event_idx(self):
        """
        The event idx used in certain rank should satisfy event_idx >= start_event_idx
        """
        return self.total_num_event // self.num_shard * self.rank

    @property
    def end_event_idx(self):
        """
        The event idx used in certain rank should satisfy event_idx < end_event_idx

        """
        if self.split_mode == "ceil":
            _last_start_event_idx = (
                self.total_num_event // self.num_shard * (self.num_shard - 1)
            )
            _num_event = self.total_num_event - _last_start_event_idx
            return self.start_event_idx + _num_event
        elif self.split_mode == "floor":
            return self.total_num_event // self.num_shard * (self.rank + 1)
        else:  # self.split_mode == 'uneven':
            if self.rank == self.num_shard - 1:  # the last process
                return self.total_num_event
            else:
                return self.total_num_event // self.num_shard * (self.rank + 1)

    @property
    def num_event(self):
        """
        The number of events split into each rank
        """
        return self.end_event_idx - self.start_event_idx

    def __len__(self):
        """
        Used only when self.sample_mode == 'sequent'
        """
        return self.total_num_seq // self.batch_size

    def _read_data(self, row, data):
        """
        Iteratively read data into data dict. Finally data[imgt] gets shape (batch_size, height, width, raw_seq_len).

        Args:
            row (Dict,optional): A series with fields IMGTYPE_filename, IMGTYPE_index, IMGTYPE_time_index.
            data (Dict,optional): , data[imgt] is a data tensor with shape = (tmp_batch_size, height, width, raw_seq_len).

        Returns:
            data (np.array): Updated data. Updated shape = (tmp_batch_size + 1, height, width, raw_seq_len).
        """

        imgtyps = np.unique([x.split("_")[0] for x in list(row.keys())])
        for t in imgtyps:
            fname = row[f"{t}_filename"]
            idx = row[f"{t}_index"]
            t_slice = slice(0, None)
            # Need to bin lght counts into grid
            if t == "lght":
                lght_data = self._hdf_files[fname][idx][:]
                data_i = self._lght_to_grid(lght_data, t_slice)
            else:
                data_i = self._hdf_files[fname][t][idx : idx + 1, :, :, t_slice]
            data[t] = (
                np.concatenate((data[t], data_i), axis=0) if (t in data) else data_i
            )
        return data

    def _lght_to_grid(self, data, t_slice=slice(0, None)):
        """
        Converts Nx5 lightning data matrix into a 2D grid of pixel counts
        """
        # out_size = (48,48,len(self.lght_frame_times)-1) if isinstance(t_slice,(slice,)) else (48,48)
        out_size = (
            (*self.data_shape["lght"], len(self.lght_frame_times))
            if t_slice.stop is None
            else (*self.data_shape["lght"], 1)
        )
        if data.shape[0] == 0:
            return np.zeros((1,) + out_size, dtype=np.float32)

        # filter out points outside the grid
        x, y = data[:, 3], data[:, 4]
        m = np.logical_and.reduce([x >= 0, x < out_size[0], y >= 0, y < out_size[1]])
        data = data[m, :]
        if data.shape[0] == 0:
            return np.zeros((1,) + out_size, dtype=np.float32)

        # Filter/separate times
        t = data[:, 0]
        if t_slice.stop is not None:  # select only one time bin
            if t_slice.stop > 0:
                if t_slice.stop < len(self.lght_frame_times):
                    tm = np.logical_and(
                        t >= self.lght_frame_times[t_slice.stop - 1],
                        t < self.lght_frame_times[t_slice.stop],
                    )
                else:
                    tm = t >= self.lght_frame_times[-1]
            else:  # special case:  frame 0 uses lght from frame 1
                tm = np.logical_and(
                    t >= self.lght_frame_times[0], t < self.lght_frame_times[1]
                )
            # tm=np.logical_and( (t>=FRAME_TIMES[t_slice],t<FRAME_TIMES[t_slice+1]) )

            data = data[tm, :]
            z = np.zeros(data.shape[0], dtype=np.int64)
        else:  # compute z coordinate based on bin location times
            z = np.digitize(t, self.lght_frame_times) - 1
            z[z == -1] = 0  # special case:  frame 0 uses lght from frame 1

        x = data[:, 3].astype(np.int64)
        y = data[:, 4].astype(np.int64)

        k = np.ravel_multi_index(np.array([y, x, z]), out_size)
        n = np.bincount(k, minlength=np.prod(out_size))
        return np.reshape(n, out_size).astype(np.int16)[np.newaxis, :]

    def _load_event_batch(self, event_idx, event_batch_size):
        """Loads a selected batch of events (not batch of sequences) into memory.

        Args:
            event_idx (int): The index of the event in the batch.
            event_batch_size (int): Event_batch[i] = all_type_i_available_events[idx:idx + event_batch_size]

        Returns:
            event_batch (List[np.array]): List of event batches.
                event_batch[i] is the event batch of the i-th data type.
                Each event_batch[i] is a np.ndarray with shape = (event_batch_size, height, width, raw_seq_len)
        """
        event_idx_slice_end = event_idx + event_batch_size
        pad_size = 0
        if event_idx_slice_end > self.end_event_idx:
            pad_size = event_idx_slice_end - self.end_event_idx
            event_idx_slice_end = self.end_event_idx
        pd_batch = self._samples.iloc[event_idx:event_idx_slice_end]
        data = {}
        for index, row in pd_batch.iterrows():
            data = self._read_data(row, data)
        if pad_size > 0:
            event_batch = []
            for t in self.data_types:
                pad_shape = [
                    pad_size,
                ] + list(data[t].shape[1:])
                data_pad = np.concatenate(
                    (
                        data[t].astype(self.output_type),
                        np.zeros(pad_shape, dtype=self.output_type),
                    ),
                    axis=0,
                )
                event_batch.append(data_pad)
        else:
            event_batch = [data[t].astype(self.output_type) for t in self.data_types]
        return event_batch

    def __iter__(self):
        return self

    @staticmethod
    def preprocess_data_dict(
        data_dict, data_types=None, layout="NHWT", rescale="01"
    ) -> Dict[str, Union[np.ndarray, paddle.Tensor]]:
        """The preprocess of data dict.

        Args:
            data_dict (Dict[str, Union[np.ndarray, paddle.Tensor]]): The dict of data.
            data_types (Sequence[str]): The data types that we want to rescale. This mainly excludes "mask" from preprocessing.
            layout (str): consists of batch_size 'N', seq_len 'T', channel 'C', height 'H', width 'W'.
            rescale (str):
                'sevir': use the offsets and scale factors in original implementation.
                '01': scale all values to range 0 to 1, currently only supports 'vil'.

        Returns:
            data_dict (Dict[str, Union[np.ndarray, paddle.Tensor]]): preprocessed data.
        """

        if rescale == "sevir":
            scale_dict = PREPROCESS_SCALE_SEVIR
            offset_dict = PREPROCESS_OFFSET_SEVIR
        elif rescale == "01":
            scale_dict = PREPROCESS_SCALE_01
            offset_dict = PREPROCESS_OFFSET_01
        else:
            raise ValueError(f"Invalid rescale option: {rescale}.")
        if data_types is None:
            data_types = data_dict.keys()
        for key, data in data_dict.items():
            if key in data_types:
                if isinstance(data, np.ndarray):
                    data = scale_dict[key] * (
                        data.astype(np.float32) + offset_dict[key]
                    )
                    data = change_layout_np(
                        data=data, in_layout="NHWT", out_layout=layout
                    )
                elif isinstance(data, paddle.Tensor):
                    data = scale_dict[key] * (data.astype("float32") + offset_dict[key])
                    data = change_layout_paddle(
                        data=data, in_layout="NHWT", out_layout=layout
                    )
                data_dict[key] = data
        return data_dict

    @staticmethod
    def process_data_dict_back(data_dict, data_types=None, rescale="01"):
        if rescale == "sevir":
            scale_dict = PREPROCESS_SCALE_SEVIR
            offset_dict = PREPROCESS_OFFSET_SEVIR
        elif rescale == "01":
            scale_dict = PREPROCESS_SCALE_01
            offset_dict = PREPROCESS_OFFSET_01
        else:
            raise ValueError(f"Invalid rescale option: {rescale}.")
        if data_types is None:
            data_types = data_dict.keys()
        for key in data_types:
            data = data_dict[key]
            data = data.astype("float32") / scale_dict[key] - offset_dict[key]
            data_dict[key] = data
        return data_dict

    @staticmethod
    def data_dict_to_tensor(data_dict, data_types=None):
        """
        Convert each element in data_dict to paddle.Tensor (copy without grad).
        """
        ret_dict = {}
        if data_types is None:
            data_types = data_dict.keys()
        for key, data in data_dict.items():
            if key in data_types:
                if isinstance(data, paddle.Tensor):
                    ret_dict[key] = data.detach().clone()
                elif isinstance(data, np.ndarray):
                    ret_dict[key] = paddle.to_tensor(data)
                else:
                    raise ValueError(
                        f"Invalid data type: {type(data)}. Should be paddle.Tensor or np.ndarray"
                    )
            else:  # key == "mask"
                ret_dict[key] = data
        return ret_dict

    @staticmethod
    def downsample_data_dict(
        data_dict, data_types=None, factors_dict=None, layout="NHWT"
    ) -> Dict[str, paddle.Tensor]:
        """The downsample of data.

        Args:
            data_dict (Dict[str, Union[np.array, paddle.Tensor]]): The dict of data.
            data_types (Optional[Sequence[str]]): Data types to be downsampled. Defaults to all keys in `data_dict`.
            factors_dict (Optional[Dict[str, Sequence[int]]]):each element `factors` is
                a Sequence of int, representing (t_factor, h_factor, w_factor).
            layout (str): Layout string, such as \"NHWT\".

        Returns:
            downsampled_data_dict (Dict[str, paddle.Tensor]): Modify on a deep copy of
                data_dict instead of directly modifying the original data_dict.
        """

        if factors_dict is None:
            factors_dict = {}
        if data_types is None:
            data_types = data_dict.keys()
        downsampled_data_dict = SEVIRDataset.data_dict_to_tensor(
            data_dict=data_dict, data_types=data_types
        )  # make a copy
        for key, data in data_dict.items():
            factors = factors_dict.get(key, None)
            if factors is not None:
                downsampled_data_dict[key] = change_layout_paddle(
                    data=downsampled_data_dict[key], in_layout=layout, out_layout="NTHW"
                )
                # downsample t dimension
                t_slice = [
                    slice(None, None),
                ] * 4
                t_slice[1] = slice(None, None, factors[0])
                downsampled_data_dict[key] = downsampled_data_dict[key][tuple(t_slice)]
                # downsample spatial dimensions
                downsampled_data_dict[key] = F.avg_pool2d(
                    input=downsampled_data_dict[key],
                    kernel_size=(factors[1], factors[2]),
                )

                downsampled_data_dict[key] = change_layout_paddle(
                    data=downsampled_data_dict[key], in_layout="NTHW", out_layout=layout
                )

        return downsampled_data_dict

    def layout_to_in_out_slice(
        self,
    ):
        t_axis = self.layout.find("T")
        num_axes = len(self.layout)
        in_slice = [
            slice(None, None),
        ] * num_axes
        out_slice = deepcopy(in_slice)
        in_slice[t_axis] = slice(None, self.in_len)
        if self.out_len is None:
            out_slice[t_axis] = slice(self.in_len, None)
        else:
            out_slice[t_axis] = slice(self.in_len, self.in_len + self.out_len)
        return in_slice, out_slice

    def __getitem__(self, index):
        event_idx = (index * self.batch_size) // self.num_seq_per_event
        seq_idx = (index * self.batch_size) % self.num_seq_per_event
        num_sampled = 0
        sampled_idx_list = []  # list of (event_idx, seq_idx) records
        while num_sampled < self.batch_size:
            sampled_idx_list.append({"event_idx": event_idx, "seq_idx": seq_idx})
            seq_idx += 1
            if seq_idx >= self.num_seq_per_event:
                event_idx += 1
                seq_idx = 0
            num_sampled += 1

        start_event_idx = sampled_idx_list[0]["event_idx"]
        event_batch_size = sampled_idx_list[-1]["event_idx"] - start_event_idx + 1

        event_batch = self._load_event_batch(
            event_idx=start_event_idx, event_batch_size=event_batch_size
        )
        ret_dict = {}
        for sampled_idx in sampled_idx_list:
            batch_slice = [
                sampled_idx["event_idx"] - start_event_idx,
            ]  # use [] to keepdim
            seq_slice = slice(
                sampled_idx["seq_idx"] * self.stride,
                sampled_idx["seq_idx"] * self.stride + self.seq_len,
            )
            for imgt_idx, imgt in enumerate(self.data_types):
                sampled_seq = event_batch[imgt_idx][batch_slice, :, :, seq_slice]
                if imgt in ret_dict:
                    ret_dict[imgt] = np.concatenate(
                        (ret_dict[imgt], sampled_seq), axis=0
                    )
                else:
                    ret_dict.update({imgt: sampled_seq})

        ret_dict = self.data_dict_to_tensor(
            data_dict=ret_dict, data_types=self.data_types
        )
        if self.preprocess:
            ret_dict = self.preprocess_data_dict(
                data_dict=ret_dict,
                data_types=self.data_types,
                layout=self.layout,
                rescale=self.rescale_method,
            )

        if self.downsample_dict is not None:
            ret_dict = self.downsample_data_dict(
                data_dict=ret_dict,
                data_types=self.data_types,
                factors_dict=self.downsample_dict,
                layout=self.layout,
            )
        in_slice, out_slice = self.layout_to_in_out_slice()
        data_seq = ret_dict["vil"]
        if isinstance(data_seq, paddle.Tensor):
            data_seq = data_seq.numpy()
        x = data_seq[in_slice[0], in_slice[1], in_slice[2], in_slice[3], in_slice[4]]
        y = data_seq[
            out_slice[0], out_slice[1], out_slice[2], out_slice[3], out_slice[4]
        ]

        weight_item = self.weight_dict
        input_item = {self.input_keys[0]: x}
        label_item = {
            self.label_keys[0]: y,
        }

        return input_item, label_item, weight_item

end_event_idx property

The event idx used in certain rank should satisfy event_idx < end_event_idx

num_event property

The number of events split into each rank

start_event_idx property

The event idx used in certain rank should satisfy event_idx >= start_event_idx

total_num_event property

The total number of events in the whole dataset, before split into different shards.

total_num_seq property

The total number of sequences within each shard. Notice that it is not the product of self.num_seq_per_event and self.total_num_event.

__len__()

Used only when self.sample_mode == 'sequent'

Source code in ppsci/data/dataset/sevir_dataset.py
def __len__(self):
    """
    Used only when self.sample_mode == 'sequent'
    """
    return self.total_num_seq // self.batch_size

close()

Closes all open file handles

Source code in ppsci/data/dataset/sevir_dataset.py
def close(self):
    """
    Closes all open file handles
    """
    for f in self._hdf_files:
        self._hdf_files[f].close()
    self._hdf_files = {}

data_dict_to_tensor(data_dict, data_types=None) staticmethod

Convert each element in data_dict to paddle.Tensor (copy without grad).

Source code in ppsci/data/dataset/sevir_dataset.py
@staticmethod
def data_dict_to_tensor(data_dict, data_types=None):
    """
    Convert each element in data_dict to paddle.Tensor (copy without grad).
    """
    ret_dict = {}
    if data_types is None:
        data_types = data_dict.keys()
    for key, data in data_dict.items():
        if key in data_types:
            if isinstance(data, paddle.Tensor):
                ret_dict[key] = data.detach().clone()
            elif isinstance(data, np.ndarray):
                ret_dict[key] = paddle.to_tensor(data)
            else:
                raise ValueError(
                    f"Invalid data type: {type(data)}. Should be paddle.Tensor or np.ndarray"
                )
        else:  # key == "mask"
            ret_dict[key] = data
    return ret_dict

downsample_data_dict(data_dict, data_types=None, factors_dict=None, layout='NHWT') staticmethod

The downsample of data.

Parameters:

Name Type Description Default
data_dict Dict[str, Union[array, Tensor]]

The dict of data.

required
data_types Optional[Sequence[str]]

Data types to be downsampled. Defaults to all keys in data_dict.

None
factors_dict Optional[Dict[str, Sequence[int]]]

each element factors is a Sequence of int, representing (t_factor, h_factor, w_factor).

None
layout str

Layout string, such as "NHWT".

'NHWT'

Returns:

Name Type Description
downsampled_data_dict Dict[str, Tensor]

Modify on a deep copy of data_dict instead of directly modifying the original data_dict.

Source code in ppsci/data/dataset/sevir_dataset.py
@staticmethod
def downsample_data_dict(
    data_dict, data_types=None, factors_dict=None, layout="NHWT"
) -> Dict[str, paddle.Tensor]:
    """The downsample of data.

    Args:
        data_dict (Dict[str, Union[np.array, paddle.Tensor]]): The dict of data.
        data_types (Optional[Sequence[str]]): Data types to be downsampled. Defaults to all keys in `data_dict`.
        factors_dict (Optional[Dict[str, Sequence[int]]]):each element `factors` is
            a Sequence of int, representing (t_factor, h_factor, w_factor).
        layout (str): Layout string, such as \"NHWT\".

    Returns:
        downsampled_data_dict (Dict[str, paddle.Tensor]): Modify on a deep copy of
            data_dict instead of directly modifying the original data_dict.
    """

    if factors_dict is None:
        factors_dict = {}
    if data_types is None:
        data_types = data_dict.keys()
    downsampled_data_dict = SEVIRDataset.data_dict_to_tensor(
        data_dict=data_dict, data_types=data_types
    )  # make a copy
    for key, data in data_dict.items():
        factors = factors_dict.get(key, None)
        if factors is not None:
            downsampled_data_dict[key] = change_layout_paddle(
                data=downsampled_data_dict[key], in_layout=layout, out_layout="NTHW"
            )
            # downsample t dimension
            t_slice = [
                slice(None, None),
            ] * 4
            t_slice[1] = slice(None, None, factors[0])
            downsampled_data_dict[key] = downsampled_data_dict[key][tuple(t_slice)]
            # downsample spatial dimensions
            downsampled_data_dict[key] = F.avg_pool2d(
                input=downsampled_data_dict[key],
                kernel_size=(factors[1], factors[2]),
            )

            downsampled_data_dict[key] = change_layout_paddle(
                data=downsampled_data_dict[key], in_layout="NTHW", out_layout=layout
            )

    return downsampled_data_dict

preprocess_data_dict(data_dict, data_types=None, layout='NHWT', rescale='01') staticmethod

The preprocess of data dict.

Parameters:

Name Type Description Default
data_dict Dict[str, Union[ndarray, Tensor]]

The dict of data.

required
data_types Sequence[str]

The data types that we want to rescale. This mainly excludes "mask" from preprocessing.

None
layout str

consists of batch_size 'N', seq_len 'T', channel 'C', height 'H', width 'W'.

'NHWT'
rescale str

'sevir': use the offsets and scale factors in original implementation. '01': scale all values to range 0 to 1, currently only supports 'vil'.

'01'

Returns:

Name Type Description
data_dict Dict[str, Union[ndarray, Tensor]]

preprocessed data.

Source code in ppsci/data/dataset/sevir_dataset.py
@staticmethod
def preprocess_data_dict(
    data_dict, data_types=None, layout="NHWT", rescale="01"
) -> Dict[str, Union[np.ndarray, paddle.Tensor]]:
    """The preprocess of data dict.

    Args:
        data_dict (Dict[str, Union[np.ndarray, paddle.Tensor]]): The dict of data.
        data_types (Sequence[str]): The data types that we want to rescale. This mainly excludes "mask" from preprocessing.
        layout (str): consists of batch_size 'N', seq_len 'T', channel 'C', height 'H', width 'W'.
        rescale (str):
            'sevir': use the offsets and scale factors in original implementation.
            '01': scale all values to range 0 to 1, currently only supports 'vil'.

    Returns:
        data_dict (Dict[str, Union[np.ndarray, paddle.Tensor]]): preprocessed data.
    """

    if rescale == "sevir":
        scale_dict = PREPROCESS_SCALE_SEVIR
        offset_dict = PREPROCESS_OFFSET_SEVIR
    elif rescale == "01":
        scale_dict = PREPROCESS_SCALE_01
        offset_dict = PREPROCESS_OFFSET_01
    else:
        raise ValueError(f"Invalid rescale option: {rescale}.")
    if data_types is None:
        data_types = data_dict.keys()
    for key, data in data_dict.items():
        if key in data_types:
            if isinstance(data, np.ndarray):
                data = scale_dict[key] * (
                    data.astype(np.float32) + offset_dict[key]
                )
                data = change_layout_np(
                    data=data, in_layout="NHWT", out_layout=layout
                )
            elif isinstance(data, paddle.Tensor):
                data = scale_dict[key] * (data.astype("float32") + offset_dict[key])
                data = change_layout_paddle(
                    data=data, in_layout="NHWT", out_layout=layout
                )
            data_dict[key] = data
    return data_dict

SphericalSWEDataset

Bases: Dataset

Loads a Spherical Shallow Water equations dataset

Training contains 200 samples in resolution 32x64. Testing contains 50 samples at resolution 32x64 and 50 samples at resolution 64x128.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
data_dir str

The directory to load data from.

required
weight_dict Optional[Dict[str, float]]

Define the weight of each constraint variable. Defaults to None.

None
test_resolutions Tuple[str, ...]

The resolutions to test dataset. Defaults to ["34x64", "64x128"].

['34x64', '64x128']
train_resolution str

The resolutions to train dataset. Defaults to "34x64".

'34x64'
data_split str

Specify the dataset split, either 'train' , 'test_32x64',or 'test_64x128'. Defaults to "train".

'train'
Source code in ppsci/data/dataset/spherical_swe_dataset.py
class SphericalSWEDataset(io.Dataset):
    """Loads a Spherical Shallow Water equations dataset

    Training contains 200 samples in resolution 32x64.
    Testing contains 50 samples at resolution 32x64 and 50 samples at resolution 64x128.

    Args:
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        data_dir (str): The directory to load data from.
        weight_dict (Optional[Dict[str, float]], optional): Define the weight of each constraint variable.
            Defaults to None.
        test_resolutions (Tuple[str, ...], optional): The resolutions to test dataset. Defaults to ["34x64", "64x128"].
        train_resolution (str, optional): The resolutions to train dataset. Defaults to "34x64".
        data_split (str, optional): Specify the dataset split, either 'train' , 'test_32x64',or 'test_64x128'.
            Defaults to "train".
    """

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        weight_dict: Optional[Dict[str, float]] = None,
        test_resolutions: Tuple[str, ...] = ["34x64", "64x128"],
        train_resolution: str = "34x64",
        data_split: str = "train",
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.test_resolutions = test_resolutions
        self.train_resolution = train_resolution
        self.data_split = data_split

        # train path
        path_train = (
            Path(self.data_dir)
            .joinpath(f"train_SWE_{self.train_resolution}.npy")
            .as_posix()
        )
        self.x_train, self.y_train = self.read_data(path_train)
        # test path
        path_test_1 = (
            Path(self.data_dir)
            .joinpath(f"test_SWE_{self.test_resolutions[0]}.npy")
            .as_posix()
        )
        self.x_test_1, self.y_test_1 = self.read_data(path_test_1)
        path_test_2 = (
            Path(self.data_dir)
            .joinpath(f"test_SWE_{self.test_resolutions[1]}.npy")
            .as_posix()
        )
        self.x_test_2, self.y_test_2 = self.read_data(path_test_2)

    def read_data(self, path):
        # load with numpy
        data = np.load(path, allow_pickle=True).item()
        x = data["x"].astype("float32")
        y = data["y"].astype("float32")
        del data
        return x, y

    def __len__(self):
        if self.data_split == "train":
            return self.x_train.shape[0]
        elif self.data_split == "test_32x64":
            return self.x_test_1.shape[0]
        else:
            return self.x_test_2.shape[0]

    def __getitem__(self, index):
        if self.data_split == "train":
            x = self.x_train[index]
            y = self.y_train[index]

        elif self.data_split == "test_32x64":
            x = self.x_test_1[index]
            y = self.y_test_1[index]
        else:
            x = self.x_test_2[index]
            y = self.y_test_2[index]

        input_item = {self.input_keys[0]: x}
        label_item = {self.label_keys[0]: y}
        weight_item = self.weight_dict

        return input_item, label_item, weight_item

STAFNetDataset

Bases: Dataset

Dataset class for STAFNet data.

Parameters:

Name Type Description Default
file_path str

Path to the dataset file.

required
input_keys Optional[Tuple[str, ...]]

Tuple of input keys. Defaults to None.

None
label_keys Optional[Tuple[str, ...]]

Tuple of label keys. Defaults to None.

None
seq_len int

Sequence length. Defaults to 72.

72
pred_len int

Prediction length. Defaults to 48.

48
use_edge_attr bool

Whether to use edge attributes. Defaults to True.

True

Examples:

>>> from ppsci.data.dataset import STAFNetDataset
>>> dataset = STAFNetDataset(file_path='example.pkl')
>>> # get the length of the dataset
>>> dataset_size = len(dataset)
>>> # get the first sample of the data
>>> first_sample = dataset[0]
>>> print("First sample:", first_sample)
Source code in ppsci/data/dataset/stafnet_dataset.py
class STAFNetDataset(io.Dataset):
    """Dataset class for STAFNet data.

    Args:
        file_path (str): Path to the dataset file.
        input_keys (Optional[Tuple[str, ...]]): Tuple of input keys. Defaults to None.
        label_keys (Optional[Tuple[str, ...]]): Tuple of label keys. Defaults to None.
        seq_len (int): Sequence length. Defaults to 72.
        pred_len (int): Prediction length. Defaults to 48.
        use_edge_attr (bool): Whether to use edge attributes. Defaults to True.

    Examples:
        >>> from ppsci.data.dataset import STAFNetDataset

        >>> dataset = STAFNetDataset(file_path='example.pkl') # doctest: +SKIP

        >>> # get the length of the dataset
        >>> dataset_size = len(dataset) # doctest: +SKIP
        >>> # get the first sample of the data
        >>> first_sample = dataset[0] # doctest: +SKIP
        >>> print("First sample:", first_sample) # doctest: +SKIP
    """

    def __init__(
        self,
        file_path: str,
        input_keys: Optional[Tuple[str, ...]] = None,
        label_keys: Optional[Tuple[str, ...]] = None,
        seq_len: int = 72,
        pred_len: int = 48,
        use_edge_attr: bool = True,
    ):

        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.use_edge_attr = use_edge_attr

        self.seq_len = seq_len
        self.pred_len = pred_len

        super().__init__()
        if file_path.endswith(".pkl"):
            with open(file_path, "rb") as f:
                self.data = pandas.read_pickle(f)
        self.metedata = self.data["metedata"]
        self.AQdata = self.data["AQdata"]
        self.AQStation_imformation = self.data["AQStation_imformation"]
        self.meteStation_imformation = self.data["meteStation_imformation"]
        mete_coords = np.array(
            self.meteStation_imformation.loc[:, ["经度", "纬度"]],
        ).astype("float32")
        AQ_coords = np.array(self.AQStation_imformation.iloc[:, -2:]).astype("float32")
        self.aq_edge_index, self.aq_edge_attr, self.aq_node_coords = self.get_edge_attr(
            np.array(self.AQStation_imformation.iloc[:, -2:]).astype("float32")
        )
        (
            self.mete_edge_index,
            self.mete_edge_attr,
            self.mete_node_coords,
        ) = self.get_edge_attr(
            np.array(self.meteStation_imformation.loc[:, ["经度", "纬度"]]).astype(
                "float32"
            )
        )

        self.lut = self.find_nearest_point(AQ_coords, mete_coords)

    def __len__(self):
        return len(self.AQdata) - self.seq_len - self.pred_len

    def __getitem__(self, idx):
        aq_train_data = paddle.to_tensor(
            data=self.AQdata[idx : idx + self.seq_len + self.pred_len],
            dtype="float32",
        )
        mete_train_data = paddle.to_tensor(
            data=self.metedata[idx : idx + self.seq_len + self.pred_len],
            dtype="float32",
        )

        input_item = {
            "aq_train_data": aq_train_data,
            "mete_train_data": mete_train_data,
        }
        label_item = {self.label_keys[0]: aq_train_data[-self.pred_len :, :, -7:]}

        return input_item, label_item, {}

    def get_edge_attr(self, node_coords, threshold=0.2):
        # node_coords = paddle.to_tensor(data=node_coords)
        dist_matrix = cdist(node_coords, node_coords)
        edge_index = np.where(dist_matrix < threshold)
        # edge_index = paddle.to_tensor(data=edge_index, dtype='int64')
        start_nodes, end_nodes = edge_index
        edge_lengths = dist_matrix[start_nodes, end_nodes]
        edge_directions = node_coords[end_nodes] - node_coords[start_nodes]
        edge_attr = paddle.to_tensor(
            data=np.concatenate((edge_lengths[:, np.newaxis], edge_directions), axis=1)
        )
        node_coords = paddle.to_tensor(data=node_coords)
        return edge_index, edge_attr, node_coords

    def find_nearest_point(self, A, B):
        nearest_indices = []
        for a in A:
            distances = [np.linalg.norm(a - b) for b in B]
            nearest_indices.append(np.argmin(distances))
        return nearest_indices

build_dataset(cfg)

Build dataset

Parameters:

Name Type Description Default
cfg Union[DictConfig, Dataset]

Dataset config or dataset.

required

Returns:

Type Description
Dataset

Dict[str, io.Dataset]: dataset.

Source code in ppsci/data/dataset/__init__.py
def build_dataset(cfg: Union[DictConfig, io.Dataset]) -> io.Dataset:
    """Build dataset

    Args:
        cfg (Union[DictConfig, io.Dataset]): Dataset config or dataset.

    Returns:
        Dict[str, io.Dataset]: dataset.
    """
    # If cfg is already a Dataset instance, return it directly
    if isinstance(cfg, io.Dataset):
        return cfg

    cfg = copy.deepcopy(cfg)

    dataset_cls = cfg.pop("name")
    if "transforms" in cfg:
        cfg["transforms"] = transform.build_transforms(cfg.pop("transforms"))

    try:
        dataset = eval(dataset_cls)(**cfg)
    except NameError:
        import textwrap

        logger.error(
            f"name {dataset_cls} is not defined, maybe you should register your dataset class first as below:\n"
            + textwrap.indent(
                "\n"
                "import paddle\n"
                "from ppsci.data import register_to_dataset\n"
                "\n"
                "@register_to_dataset\n"
                "class MyDataset(paddle.io.Dataset):\n"
                "    pass\n"
                "\n",
                prefix=" " * 4,
            )
        )
        raise

    logger.debug(str(dataset))

    return dataset