跳转至

隐空间神经算子LatentNO(or LNO)

# Darcy
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Darcy_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Darcy_$f.npy --create-dirs -o ./datas/Darcy_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Darcy.yaml

# Elasticity
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Elasticity_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Elasticity_$f.npy --create-dirs -o ./datas/Elasticity_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Elasticity.yaml

# Pipe
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Pipe_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Pipe_$f.npy --create-dirs -o ./datas/Pipe_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Pipe.yaml

# NS2d
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/NS2d_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/NS2d_$f.npy --create-dirs -o ./datas/Pipe_$f.npy}
python LatentNO-time.py --config-name=LatentNO-NS2d.yaml
# Darcy
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Darcy_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Darcy_$f.npy --create-dirs -o ./datas/Darcy_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Darcy.yaml mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/LatentNO/LatentNO_Darcy_pretrained.pdparams

# Elasticity
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Elasticity_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Elasticity_$f.npy --create-dirs -o ./datas/Elasticity_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Elasticity.yaml mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/LatentNO/LatentNO_Elasticity_pretrained.pdparams

# Pipe
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Pipe_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Pipe_$f.npy --create-dirs -o ./datas/Pipe_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Pipe.yaml mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/LatentNO/LatentNO_Pipe_pretrained.pdparams

# NS2d
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/NS2d_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/NS2d_$f.npy --create-dirs -o ./datas/Pipe_$f.npy}
python LatentNO-time.py --config-name=LatentNO-NS2d.yaml mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/LatentNO/LatentNO_NS2d_pretrained.pdparams

1. 背景简介

偏微分方程(PDE)的求解的正问题指的是在已知方程的具体形式和初始、边界条件的情况下求出解函数。可以被统一为算子学习的任务,从而被归纳为序列到序列的转换框架。神经算子模型可以基于成对训练数据,通过数据驱动的方式学习从输入函数到输出函数的映射,其中输入函数和输出函数均通过采样点序列进行表示。 近年来 Transformer 架构在神经算子的构建中占据了主导地位。注意力机制建模了序列中全体对象之间的长距离非线性相互作用关系,自然地符合 PDE 求解过程中序列到序列的表征方式,并且相较于传统的全连接结构可以提供更精确的建模结果。但是注意力机制相对于序列长度的时间复杂度是平方级别,因此使用注意力机制构建神经算子带来的计算成本急剧增加。为了降低计算成本,一部分已有工作尝试采用线性时间复杂度的注意力机制变体取代原始注意力机制,但是由于其建模能力有限,往往会牺牲 PDE 的求解精度。另一部分已有工作尝试在隐空间中使用少量物理特征求解 PDE,从而摆脱原始几何空间中大量采样点之间错综复杂的相互作用关系,并在紧致的隐空间中捕捉物理特征之间的关联,然而这些方法要么依赖人工指定的基函数特征,要么没能构建持续存在的隐空间。 因此, 本案例提出物理交叉注意力模块,该模块解耦了输入的观测样本和输出的待预测样本的位置,并从数据中自主学习持续存在的隐空间。基于物理交叉注意力模块,进一步设计了隐空间神经算子模型。

pipe

隐空间神经算子结构图

2. 隐空间神经算子的实现

本节将讲解如何基于PaddleScience代码,实现对于隐空间神经算子模型的构建、训练、测试和评估。案例的目录结构如下。

LatentNO/
├── config
│     ├── LatentNO-Darcy.yaml
│     └── ...
├── datas
│   ├── Darcy_train.npy
│   ├── Darcy_val.npy
│   └── ...
├── LatentNO-steady.py
├── LatentNO-time.py
└── utils.py

2.1 数据集构建和载入

针对本项目中涉及的不同任务,本样例的数据集可以被划分为两类:一类是静态数据(Darcy、Pipe、 Elasticity);另一类含时数据(NS2d)。为了兼容 PaddleScience 框架下的自动训练流程,本案例设计并实现了专用的数据集类,分别对应静态场景与动态场景,命名为 LatentNODatasetLatentNODataset_time。接下来首先对 LatentNODataset 的构建做具体说明。

对于静态数据类任务,数据首先以 .npy 文件的形式存放在 ./datas 目录中,每个文件按照数据名称与模式(训练集或验证集)进行命名,例如 Darcy_train.npyDarcy_val.npy。这些文件内部存储了包含 x、y1 与 y2 三个关键变量的字典,x与y1均会被作为模型的输入 ,而 y2 则为最终的预测目标。在载入阶段,数据会被转换为 Paddle 的张量格式,并根据需求调整形状以满足模型的输入要求,并在必要时将 x 与 y1 进行拼接。

ppsci/data/dataset/latent_no_dataset.py
data_file = osp.join("datas", f"{data_name}_{data_mode}.npy")
if not os.path.exists(data_file):
    raise FileNotFoundError(f"Data file not found: {data_file}")

dataset = np.load(data_file, allow_pickle=True).tolist()

x = np.array(dataset["x"], dtype=np.float32)
y1 = np.array(dataset["y1"], dtype=np.float32)
y2 = np.array(dataset["y2"], dtype=np.float32)

x = np.reshape(x, (x.shape[0], -1, x.shape[-1]))
y1 = np.reshape(y1, (y1.shape[0], -1, y1.shape[-1]))
y2 = np.reshape(y2, (y2.shape[0], -1, y2.shape[-1]))

if data_concat:
    y1 = np.concatenate((x, y1), axis=-1)

x_tensor = paddle.to_tensor(x)
y1_tensor = paddle.to_tensor(y1)
y2_tensor = paddle.to_tensor(y2)

为了增强模型的训练稳定性与泛化能力,数据集类中还内置了归一化模块。该模块会在初始化阶段统计各变量的均值与标准差,并在数据载入时自动进行归一化处理。同时提供了反归一化的接口,便于在推理或可视化时还原到物理真实尺度。

ppsci/data/dataset/latent_no_dataset.py
self.normalizer = Normalizer(x_tensor, y1_tensor, y2_tensor)

if data_normalize:
    x = self.normalizer.apply_x(x_tensor, "cpu").numpy()
    y1 = self.normalizer.apply_y1(y1_tensor, "cpu").numpy()
    y2 = self.normalizer.apply_y2(y2_tensor, "cpu").numpy()
ppsci/data/dataset/latent_no_dataset.py
class Normalizer:
    def __init__(self, x, y1, y2):
        self.x_flag = False
        self.y1_flag = False
        self.y2_flag = False
        old_x_shape = x.shape
        old_y1_shape = y1.shape
        old_y2_shape = y2.shape
        x = paddle.reshape(x, (-1, x.shape[-1]))
        y1 = paddle.reshape(y1, (-1, y1.shape[-1]))
        y2 = paddle.reshape(y2, (-1, y2.shape[-1]))
        self.x_mean = paddle.mean(x, axis=0)
        self.x_std = paddle.std(x, axis=0) + 1e-8
        self.y1_mean = paddle.mean(y1, axis=0)
        self.y1_std = paddle.std(y1, axis=0) + 1e-8
        self.y2_mean = paddle.mean(y2, axis=0)
        self.y2_std = paddle.std(y2, axis=0) + 1e-8
        x = paddle.reshape(x, old_x_shape)
        y1 = paddle.reshape(y1, old_y1_shape)
        y2 = paddle.reshape(y2, old_y2_shape)

    def is_apply_x(self):
        return self.x_flag

    def is_apply_y1(self):
        return self.y1_flag

    def is_apply_y2(self):
        return self.y2_flag

    def apply_x(self, x, device, inverse=False):
        self.x_mean = self.x_mean.to(device)
        self.x_std = self.x_std.to(device)

        old_x_shape = x.shape
        x = paddle.reshape(x, (-1, x.shape[-1]))
        if not inverse:
            x = (x - self.x_mean) / self.x_std
            self.x_flag = True
        else:
            x = x * self.x_std + self.x_mean
        x = paddle.reshape(x, old_x_shape)
        return x

    def apply_y1(self, y1, device, inverse=False):
        self.y1_mean = self.y1_mean.to(device)
        self.y1_std = self.y1_std.to(device)

        old_y1_shape = y1.shape
        y1 = paddle.reshape(y1, (-1, y1.shape[-1]))
        if not inverse:
            y1 = (y1 - self.y1_mean) / self.y1_std
            self.y1_flag = True
        else:
            y1 = y1 * self.y1_std + self.y1_mean
        y1 = paddle.reshape(y1, old_y1_shape)
        return y1

    def apply_y2(self, y2, device, inverse=False):
        self.y2_mean = self.y2_mean.to(device)
        self.y2_std = self.y2_std.to(device)

        old_y2_shape = y2.shape
        y2 = paddle.reshape(y2, (-1, y2.shape[-1]))
        if not inverse:
            y2 = (y2 - self.y2_mean) / self.y2_std
            self.y2_flag = True
        else:
            y2 = y2 * self.y2_std + self.y2_mean
        y2 = paddle.reshape(y2, old_y2_shape)
        return y2

在训练过程中,通过调用 __getitem__ 方法,可以按索引返回一条数据的输入、标签及对应权重,从而无缝衔接到训练管线中。

ppsci/data/dataset/latent_no_dataset.py
def __getitem__(self, index: int):
    input_item = {
        key: paddle.to_tensor(value[index], dtype="float32")
        for key, value in self.input_data.items()
    }

    label_item = {
        key: paddle.to_tensor(value[index], dtype="float32")
        for key, value in self.label_data.items()
    }

    weight_item = {}
    if self.weight_dict:
        for key in self.label_keys:
            if key in self.weight_dict:
                weight_item[key] = self.weight_dict[key]

    if self.transform_fn:
        input_item, label_item = self.transform_fn(input_item, label_item)

    return input_item, label_item, weight_item

整个数据以 PaddleScience 约定的格式存储在字典中,input 用于提供输入张量,label 用于提供监督信号,而 weight_dict 则允许用户为不同的损失分量赋予权重。

对于含时数据类任务,数据集的构建是类似的,主要区别在于 LatentNODataset_time 在输入字典中同时保留了 x、y1 与 y2,使得模型能够直接获取到时间相关的上下文信息以辅助训练,而标签部分依旧为 y2,用以监督最终的预测结果。这种设计保证了训练过程对时间依赖特性的捕捉,也为后续的长时间演化预测提供了良好的数据接口。

2.2 模型构建

隐空间神经算子包含编码、隐空间算子拟合和解码三个过程。在处理静态数据任务中,模型的前向传播过程用PaddleScience表示如下:

ppsci/arch/latent_no.py
def forward(self, inputs: dict[str, paddle.Tensor]) -> dict[str, paddle.Tensor]:
    """
    Forward pass of LatentNO.

    Args:
        inputs (dict[str, paddle.Tensor]):
            Dictionary with keys:
                - "x": Trunk input tensor of shape (B, N, trunk_dim).
                - "y1": Branch input tensor of shape (B, N, branch_dim).

    Returns:
        dict[str, paddle.Tensor]: Dictionary containing:
            - "y2": Output tensor of shape (B, N, out_dim).
    """
    x = inputs[self.input_keys[0]]  # trunk input
    y = inputs[self.input_keys[1]]  # branch input

    x = self.trunk_mlp(x)
    y = self.branch_mlp(y)

    score = self.mode_mlp(x)
    score_encode = paddle.nn.functional.softmax(score, axis=1)
    score_decode = paddle.nn.functional.softmax(score, axis=-1)

    z = paddle.matmul(paddle.transpose(score_encode, perm=[0, 2, 1]), y)
    for block in self.attn_blocks:
        z = block(z)

    r = paddle.matmul(score_decode, z)
    r = self.out_mlp(r)

    return {self.output_keys[0]: r}

pipe

编码、解码阶段所用物理交叉注意力模块

编码过程包含输入投影和输入函数编码两部分。其中输入投影操作将几何空间中以序列形式输入的观测函数的采样位置与对应的物理量值组成的元组提升到更高的向量维度。几何空间是PDE输入或输出的原始空间,其中包含若干个样本点,每个样本由多维空维位置坐标和多维物理量值组成。通过输入投影操作,观测函数能够被投影到更容易捕捉非局部特征的空间中。输入函数编码操作将投影后的输入数据从几何空间映射到隐空间中。隐空间神经算子模型使用隐空间中的假想采样位置的表征Token来对输入函数进行重新表示,其中假想采样位置的数量远小于输入函数在几何空间中的采样点数,实现序列压缩的目的。隐空间神经算子模型使用物理交叉注意力来完成输入函数从几何空间到隐空间的编码操作。编码操作的相关代码用PaddleScience表示如下:

ppsci/arch/latent_no.py
x = inputs[self.input_keys[0]]  # trunk input
y = inputs[self.input_keys[1]]  # branch input

x = self.trunk_mlp(x)
y = self.branch_mlp(y)

score = self.mode_mlp(x)
score_encode = paddle.nn.functional.softmax(score, axis=1)
score_decode = paddle.nn.functional.softmax(score, axis=-1)

z = paddle.matmul(paddle.transpose(score_encode, perm=[0, 2, 1]), y)
ppsci/arch/latent_no.py
self.trunk_mlp = LatentMLP(trunk_dim, n_dim, n_dim, n_layer)
self.branch_mlp = LatentMLP(branch_dim, n_dim, n_dim, n_layer)
self.mode_mlp = LatentMLP(n_dim, n_dim, n_mode, n_layer)
ppsci/arch/latent_no.py
class LatentMLP(paddle.nn.Layer):
    """
    Multi-layer perceptron with residual connections used for trunk/branch/mode/out projections.

    Args:
        input_dim (int): Input feature dimension.
        hidden_dim (int): Hidden feature dimension.
        output_dim (int): Output feature dimension.
        n_layer (int): Number of hidden layers (residual blocks).

    Input:
        x (paddle.Tensor): shape (B, N, input_dim) or (..., input_dim)

    Returns:
        paddle.Tensor: shape (B, N, output_dim)
    """

    def __init__(
        self, input_dim: int, hidden_dim: int, output_dim: int, n_layer: int
    ) -> None:
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.n_layer = n_layer
        self.act = paddle.nn.GELU()
        self.input = paddle.nn.Linear(self.input_dim, self.hidden_dim)
        self.hidden = paddle.nn.LayerList(
            [
                paddle.nn.Linear(self.hidden_dim, self.hidden_dim)
                for _ in range(self.n_layer)
            ]
        )
        self.output = paddle.nn.Linear(self.hidden_dim, self.output_dim)

    def forward(self, x: paddle.Tensor) -> paddle.Tensor:
        """
        Args:
            x (paddle.Tensor): Input tensor of shape (B, N, input_dim).

        Returns:
            paddle.Tensor: Output tensor of shape (B, N, output_dim).
        """
        r = self.act(self.input(x))
        for i in range(0, self.n_layer):
            r = r + self.act(self.hidden[i](r))
        r = self.output(r)
        return r

对输入函数完成编码后,待处理的序列长度显著减少为了隐空间中假想采样点位置的数量,因此在隐空间中对输入函数的特征进行提取和转换比在原本几何空间中更加高效。隐空间神经算子模型在隐空间中拟合PDE问题的解算子,使用堆叠的 Transformer 层,借助自注意力机制作为核积分算子,每一层都在隐空间中将假想采样位置上的表征 Token 进行信息聚合,从而将输入函数的特征转换为输出函数的特征。在隐空间中基于更短的特征序列进行解算子的拟合赋予了隐空间神经算子模型在 PDE 问题上更高的求解效率,并且同时兼容了更强建模能力的核积分算子,从而也确保了在 PDE 问题上出色的求解精度。隐空间中的堆叠结构用PaddleScience表示如下:

ppsci/arch/latent_no.py
for block in self.attn_blocks:
    z = block(z)
ppsci/arch/latent_no.py
self.attn_blocks = paddle.nn.Sequential(
    *[AttentionBlock(n_mode, n_dim, n_head) for _ in range(n_block)]
)
ppsci/arch/latent_no.py
class AttentionBlock(paddle.nn.Layer):
    """
    Transformer-style block: LayerNorm -> Self-Attention (residual) -> LayerNorm -> MLP (residual).

    Args:
        n_mode (int): Sequence length / number of modes (documentation).
        n_dim (int): Feature dimension D.
        n_head (int): Number of attention heads.

    Input:
        y (paddle.Tensor): shape (B, N, D)

    Returns:
        paddle.Tensor: shape (B, N, D)
    """

    def __init__(self, n_mode: int, n_dim: int, n_head: int) -> None:
        super().__init__()
        self.n_mode = n_mode
        self.n_dim = n_dim
        self.n_head = n_head

        self.self_attn = SelfAttention(
            self.n_mode, self.n_dim, self.n_head, Attention_Vanilla
        )

        self.ln1 = paddle.nn.LayerNorm(self.n_dim)
        self.ln2 = paddle.nn.LayerNorm(self.n_dim)

        self.mlp = paddle.nn.Sequential(
            paddle.nn.Linear(self.n_dim, self.n_dim * 2),
            paddle.nn.GELU(),
            paddle.nn.Linear(self.n_dim * 2, self.n_dim),
        )

    def forward(self, y: paddle.Tensor) -> paddle.Tensor:
        """
        Forward pass of the Transformer-style attention block.

        Args:
            y (paddle.Tensor): Input tensor of shape (B, N, D).

        Returns:
            paddle.Tensor: Output tensor of shape (B, N, D).
        """
        y1 = self.ln1(y)
        y = y + self.self_attn(y1)
        y2 = self.ln2(y)
        y = y + self.mlp(y2)
        return y

解码过程包含输出函数解码和输出投影两部分。输出函数解码操作将经过堆叠 Transformer 层转换后的假想采样位置上的表征 Token 映射回几何空间中。隐空间神经算子模型再次使用物理交叉注意力,根据输出函数的查询位置解码隐空间中输出函数表征序列在对应待预测位置上的表征向量。输出投影操作则将解码得到的待预测位置上的表征向量投影为预测的低维度物理量值。解码过程相关代码用PaddleScience表示如下

ppsci/arch/latent_no.py
r = paddle.matmul(score_decode, z)
r = self.out_mlp(r)
ppsci/arch/latent_no.py
self.out_mlp = LatentMLP(n_dim, n_dim, out_dim, n_layer)

在处理含时数据时,模型整体结构不变,但为了满足PaddleScience自动训练的要求,LatentNO_time 类重写了前向传播函数,实现了一个时间展开(time-unroll / 自回归)流程。在时间迭代内部,LatentNO_time 引入了两条不同的下一步输入来源:训练期间额外使用外部提供的 y2(标签信息),从 y2 中切出对齐的片段 y2[..., t:t+step] 作为下一个输入一部分;在推断时则使用模型的 pred_step 作为下一个输入并对其执行 stop_gradient=True,以阻断跨步的梯度传播。无论采用哪种来源,下一步的 current_y 都通过“保留 trunk 部分 + 丢弃最早的若干时间槽 + 在末尾拼接新片段”的滑动窗口方式更新。用 PaddleScience 表示如下

ppsci/arch/latent_no.py
def forward(self, inputs: dict[str, paddle.Tensor]) -> dict[str, paddle.Tensor]:
    """
    Forward pass of LatentNO_time.

    Args:
        inputs (dict[str, paddle.Tensor]):
            Dictionary with keys:
                - "x": Trunk input tensor of shape (B, N, trunk_dim).
                - "y1": Branch input tensor of shape (B, N, branch_dim).
                - "y2" (optional): Ground-truth sequence for teacher forcing (B, N, T).

    Returns:
        dict[str, paddle.Tensor]:
            - If time_unroll == False:
                {"y2": (B, N, out_dim)}
            - If time_unroll == True:
                {"y2": (B, N, T_total), "y2_steps": (B, N, step, num_steps)}
    """
    x = inputs[self.input_keys[0]]
    y1 = inputs[self.input_keys[1]]
    y2_gt = inputs.get(self.input_keys[2], None)  # optional ground truth

    # simple single-step (original behaviour)
    if not getattr(self, "time_unroll", False):
        r = self._single_step_predict(x, y1)
        return {"y2": r}

    # time-unroll (autoregressive) mode
    if self.T is None or self.step is None:
        raise ValueError("time_unroll enabled but model.T or model.step is None.")
    if not hasattr(self, "trunk_split") or self.trunk_split is None:
        raise ValueError("time_unroll enabled but model.trunk_split is not set.")

    current_y = y1
    pred_steps = []

    # iterate time: mimic original `for t in range(0, T, step)`
    for t in range(0, self.T, self.step):
        # predict one step
        pred_step = self._single_step_predict(x, current_y)

        # append for final concatenation
        pred_steps.append(pred_step)

        # - training + use_teacher_forcing -> use GT slice from inputs["y2"] (teacher forcing)
        # - otherwise -> use pred_step (autoregressive)
        if (
            self.training
            and getattr(self, "use_teacher_forcing", False)
            and (y2_gt is not None)
        ):
            # use GT slice (must exist and have time alignment)
            next_input_part = y2_gt[..., t : t + self.step]
        else:
            # use prediction as next input; make sure to block gradient so predictions don't backprop through time
            pred_step.stop_gradient = True
            next_input_part = pred_step

        # update current_y: keep trunk part, drop earliest step slot(s), append the next part
        left = current_y[..., : self.trunk_split]
        right = current_y[..., self.trunk_split + self.step :]
        current_y = paddle.concat((left, right, next_input_part), axis=-1)

    # final outputs: concat along time dimension (last dim of out is per-step time dim)
    pred_full = paddle.concat(pred_steps, axis=-1)
    pred_steps_stack = paddle.stack(pred_steps, axis=-1)

    return {
        self.output_keys[0]: pred_full,
        f"{self.output_keys[0]}_steps": pred_steps_stack,
    }

在训练或验证函数中,模型通过如下代码进行实例化。

examples/LatentNO/LatentNO-steady.py
model = ppsci.arch.LatentNO(**cfg.MODEL)
examples/LatentNO/LatentNO-time.py
model = ppsci.arch.LatentNO_time(**cfg.MODEL)

2.3 约束构建

本案例采用监督学习,按照 PaddleScience 的API结构说明,采用内置的 SupervisedConstraint 构建监督约束。用 PaddleScience 代码表示如下(测试约束类似,区别在于部分任务中需要在计算测试损失时进行反归一化操作,即通过 sup_constraint.data_loader.dataset.normalizer 获得训练集的归一化器并作为参数传入 RelLpLoss

examples/LatentNO/LatentNO-steady.py
lr_scheduler = ppsci.optimizer.lr_scheduler.OneCycleLR(
    epochs=cfg.TRAIN.epochs,
    iters_per_epoch=cfg.TRAIN.iters_per_epoch,
    max_learning_rate=cfg.TRAIN.lr,
    divide_factor=cfg.TRAIN.div_factor,
    end_learning_rate=cfg.TRAIN.lr
    / cfg.TRAIN.div_factor
    / cfg.TRAIN.final_div_factor,
    phase_pct=cfg.TRAIN.pct_start,
)()

其中损失函数为相对 Lp 损失。对于静态任务,损失函数 RelLpLoss 表示如下。

examples/LatentNO/utils.py
class RelLpLoss(base.Metric):
    def __init__(
        self,
        p: int,
        key: str = "y2",
        normalizer: Optional[object] = None,
        eps: float = 1e-12,
        keep_batch: bool = False,
    ):
        if keep_batch:
            raise ValueError(f"keep_batch should be False, but got {keep_batch}.")
        super(RelLpLoss, self).__init__(keep_batch)
        self.p = p
        self.key = key
        self.normalizer = normalizer
        self.eps = eps

    def forward(
        self,
        output_dict: Dict[str, paddle.Tensor],
        label_dict: Dict[str, paddle.Tensor],
        weight_dicts: Optional[Dict] = None,
    ) -> Dict[str, "paddle.Tensor"]:
        losses: Dict[str, paddle.Tensor] = {}
        for label_key in label_dict:
            pred_key = self.key if self.key in output_dict else label_key
            pred = output_dict[pred_key]
            target = label_dict[label_key]

            if self.normalizer is not None:
                pred = self.normalizer.apply_y2(pred, device="cpu", inverse=True)
                target = self.normalizer.apply_y2(target, device="cpu", inverse=True)

            error = paddle.sum(
                paddle.abs(pred - target) ** self.p,
                axis=tuple(range(1, len(pred.shape))),
            ) ** (1.0 / self.p)
            target_norm = paddle.sum(
                paddle.abs(target) ** self.p, axis=tuple(range(1, len(target.shape)))
            ) ** (1.0 / self.p)

            denom = target_norm.clip(min=self.eps)
            rloss = paddle.mean(error / denom)
            losses[label_key] = rloss

        return losses

同样,对于含时任务做出了适应自动训练框架的调整,RelLpLoss_time 通过 use_full_sequence 参数实现使用逐时间步累积误差进行梯度反传更新,并使用完整序列一次性误差作为评估指标。

examples/LatentNO/utils.py
class RelLpLoss_time(base.Metric):
    def __init__(
        self,
        p: int,
        key: str = "y2",
        normalizer: Optional[object] = None,
        eps: float = 1e-12,
        keep_batch: bool = False,
        use_full_sequence: bool = True,
    ):
        if keep_batch:
            raise ValueError(f"keep_batch should be False, but got {keep_batch}.")
        super(RelLpLoss_time, self).__init__(keep_batch)
        self.p = p
        self.key = key
        self.normalizer = normalizer
        self.eps = eps
        self.use_full_sequence = use_full_sequence  # True: use full sequence loss; False: accumulate step-wise losses

    def forward(
        self,
        output_dict: Dict[str, paddle.Tensor],
        label_dict: Dict[str, paddle.Tensor],
        weight_dicts: Optional[Dict] = None,
    ) -> Dict[str, "paddle.Tensor"]:
        losses: Dict[str, paddle.Tensor] = {}
        for label_key in label_dict:
            if f"{self.key}_steps" in output_dict and not self.use_full_sequence:
                # Method 1: Accumulate losses at each timestep (matches backpropagation loss)
                pred_stack = output_dict[f"{self.key}_steps"]
                target_full = label_dict[label_key]
                step = pred_stack.shape[2]
                num_steps = pred_stack.shape[3]

                total_loss = paddle.to_tensor(0.0)
                for s in range(num_steps):
                    pred_s = pred_stack[..., s]
                    t_start = s * step
                    t_end = t_start + step
                    tgt_s = target_full[..., t_start:t_end]

                    if self.normalizer is not None:
                        pred_s = self.normalizer.apply_y2(pred_s, inverse=True)
                        tgt_s = self.normalizer.apply_y2(tgt_s, inverse=True)

                    # Compute Lp error for current timestep
                    error = paddle.sum(
                        paddle.abs(pred_s - tgt_s) ** self.p,
                        tuple(range(1, len(pred_s.shape))),
                    ) ** (1 / self.p)
                    target_norm = paddle.sum(
                        paddle.abs(tgt_s) ** self.p, tuple(range(1, len(tgt_s.shape)))
                    ) ** (1 / self.p)
                    step_loss = paddle.mean(error / target_norm)
                    total_loss = total_loss + step_loss

                losses[label_key] = total_loss

            else:
                # Method 2: Use full sequence loss
                pred_full = (
                    output_dict[self.key]
                    if self.key in output_dict
                    else output_dict[label_key]
                )
                target_full = label_dict[label_key]

                if self.normalizer is not None:
                    pred_full = self.normalizer.apply_y2(pred_full, inverse=True)
                    target_full = self.normalizer.apply_y2(target_full, inverse=True)

                error = paddle.sum(
                    paddle.abs(pred_full - target_full) ** self.p,
                    tuple(range(1, len(pred_full.shape))),
                ) ** (1 / self.p)
                target_norm = paddle.sum(
                    paddle.abs(target_full) ** self.p,
                    tuple(range(1, len(target_full.shape))),
                ) ** (1 / self.p)
                losses[label_key] = paddle.mean(error / target_norm)

        return losses

2.4 优化器构建

训练器采用AdamW优化器,学习率设置由配置文件给出,并使用OneCycleLR控制学习率变化。用 PaddleScience 代码表示如下

examples/LatentNO/LatentNO-steady.py
lr_scheduler = ppsci.optimizer.lr_scheduler.OneCycleLR(
    epochs=cfg.TRAIN.epochs,
    iters_per_epoch=cfg.TRAIN.iters_per_epoch,
    max_learning_rate=cfg.TRAIN.lr,
    divide_factor=cfg.TRAIN.div_factor,
    end_learning_rate=cfg.TRAIN.lr
    / cfg.TRAIN.div_factor
    / cfg.TRAIN.final_div_factor,
    phase_pct=cfg.TRAIN.pct_start,
)()

optimizer = ppsci.optimizer.AdamW(
    lr_scheduler,
    weight_decay=cfg.TRAIN.weight_decay,
    grad_clip=paddle.nn.ClipGradByNorm(clip_norm=cfg.TRAIN.clip_norm),
    beta1=cfg.TRAIN.beta0,
    beta2=cfg.TRAIN.beta1,
)(model)

metric_dict = {"L2Rel": RelLpLoss(p=2, key="y2", normalizer=normalizer)}

2.5 模型训练

完成上述设置之后,只需要将上述实例化的对象按顺序传递给ppsci.solver.Solver,然后启动训练即可。用PaddleScience 代码表示如下

examples/LatentNO/LatentNO-steady.py
solver = ppsci.solver.Solver(
    model=model,
    optimizer=optimizer,
    constraint=constraint,
    validator=validator,
    cfg=cfg,
)

solver.train()

3. 完整代码

examples/LatentNO/LatentNO-steady.py
import hydra
import paddle
from omegaconf import DictConfig
from utils import RelLpLoss

import ppsci


def train(cfg: DictConfig):
    model = ppsci.arch.LatentNO(**cfg.MODEL)

    train_dataloader_cfg = {
        "dataset": {
            "name": "LatentNODataset",
            "data_name": cfg.data_name,
            "data_mode": "train",
            "data_normalize": cfg.data_normalize,
            "data_concat": cfg.data_concat,
            "input_keys": ("x", "y1"),
            "label_keys": ("y2",),
        },
        "sampler": {"name": "BatchSampler", "drop_last": True, "shuffle": True},
        "batch_size": cfg.batch_size,
        "num_workers": cfg.get("num_workers", 0),
    }

    eval_dataloader_cfg = {
        "dataset": {
            "name": "LatentNODataset",
            "data_name": cfg.data_name,
            "data_mode": "val",
            "data_normalize": cfg.data_normalize,
            "data_concat": cfg.data_concat,
            "input_keys": ("x", "y1"),
            "label_keys": ("y2",),
        },
        "sampler": {"name": "BatchSampler", "drop_last": True, "shuffle": False},
        "batch_size": cfg.batch_size,
        "num_workers": cfg.get("num_workers", 0),
    }

    train_loss_fn = RelLpLoss(p=2, key="y2", normalizer=None)

    sup_constraint = ppsci.constraint.SupervisedConstraint(
        train_dataloader_cfg,
        train_loss_fn,
        output_expr={"y2": lambda out: out["y2"]},
        name="SupTrain",
    )
    if cfg.data_normalize:
        normalizer = sup_constraint.data_loader.dataset.normalizer
    else:
        normalizer = None
    constraint = {sup_constraint.name: sup_constraint}

    cfg.TRAIN.iters_per_epoch = len(sup_constraint.data_loader)
    lr_scheduler = ppsci.optimizer.lr_scheduler.OneCycleLR(
        epochs=cfg.TRAIN.epochs,
        iters_per_epoch=cfg.TRAIN.iters_per_epoch,
        max_learning_rate=cfg.TRAIN.lr,
        divide_factor=cfg.TRAIN.div_factor,
        end_learning_rate=cfg.TRAIN.lr
        / cfg.TRAIN.div_factor
        / cfg.TRAIN.final_div_factor,
        phase_pct=cfg.TRAIN.pct_start,
    )()

    optimizer = ppsci.optimizer.AdamW(
        lr_scheduler,
        weight_decay=cfg.TRAIN.weight_decay,
        grad_clip=paddle.nn.ClipGradByNorm(clip_norm=cfg.TRAIN.clip_norm),
        beta1=cfg.TRAIN.beta0,
        beta2=cfg.TRAIN.beta1,
    )(model)

    metric_dict = {"L2Rel": RelLpLoss(p=2, key="y2", normalizer=normalizer)}

    val_loss_fn = RelLpLoss(p=2, key="y2", normalizer=normalizer)

    sup_validator = ppsci.validate.SupervisedValidator(
        eval_dataloader_cfg,
        val_loss_fn,
        output_expr={"y2": lambda out: out["y2"]},
        metric=metric_dict,
        name="SupVal",
    )
    validator = {sup_validator.name: sup_validator}

    solver = ppsci.solver.Solver(
        model=model,
        optimizer=optimizer,
        constraint=constraint,
        validator=validator,
        cfg=cfg,
    )

    solver.train()
    solver.eval()


def evaluate(cfg: DictConfig):
    train_ds = ppsci.data.dataset.LatentNODataset(
        cfg.data_name,
        "train",
        cfg.data_normalize,
        cfg.data_concat,
        input_keys=("x", "y1"),
        label_keys=("y2",),
    )
    if cfg.data_normalize:
        normalizer = train_ds.normalizer
    else:
        normalizer = None

    eval_loss_fn = RelLpLoss(p=2, key="y2", normalizer=normalizer)

    model = ppsci.arch.LatentNO(**cfg.MODEL)

    eval_dataloader_cfg = {
        "dataset": {
            "name": "LatentNODataset",
            "data_name": cfg.data_name,
            "data_mode": "val",
            "data_normalize": cfg.data_normalize,
            "data_concat": cfg.data_concat,
            "input_keys": ("x", "y1"),
            "label_keys": ("y2",),
        },
        "sampler": {"name": "BatchSampler", "drop_last": True, "shuffle": False},
        "batch_size": cfg.batch_size,
        "num_workers": cfg.get("num_workers", 0),
    }

    metric_dict = {"L2Rel": RelLpLoss(p=2, key="y2", normalizer=normalizer)}

    validator = ppsci.validate.SupervisedValidator(
        eval_dataloader_cfg,
        eval_loss_fn,
        output_expr={"y2": lambda out: out["y2"]},
        metric=metric_dict,
        name="Evaluation",
    )

    solver = ppsci.solver.Solver(
        model=model,
        validator={"eval": validator},
        pretrained_model_path=cfg.EVAL.pretrained_model_path,
    )

    solver.eval()


@hydra.main(
    version_base=None, config_path="./config", config_name="LatentNO-Darcy.yaml"
)
def main(cfg: DictConfig):

    if cfg.mode == "train":
        train(cfg)
    elif cfg.mode == "eval":
        evaluate(cfg)
    else:
        raise ValueError(f"cfg.mode should in ['train', 'eval'], but got '{cfg.mode}'")


if __name__ == "__main__":
    main()

4. 结果展示

以下展示隐空间神经算子在若干PDE前向问题中的性能表现。

pipe

隐空间神经算子在若干PDE前向问题中的性能表现

5. 参考文献

[1] Wang T, Wang C. Latent neural operator for solving forward and inverse pde problems[J]. Advances in Neural Information Processing Systems, 2024, 37: 33085-33107.