跳转至

SPINN(helmholtz3d)

AI Studio快速体验

python helmholtz3d.py
python helmholtz3d.py mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/spinn/spinn_helmholtz3d_pretrained.pdparams
python helmholtz3d.py mode=export
python helmholtz3d.py mode=infer
预训练模型 指标
spinn_helmholtz3d.pdparams l2_err: 0.0183
rmse: 0.0064

1. 背景简介

Helmholtz方程是一个重要的偏微分方程,广泛应用于物理学和工程学中,特别是在波动理论和振动问题中。它以德国物理学家赫尔曼·冯·亥姆霍兹(Hermann von Helmholtz)的名字命名。Helmholtz方程的标准形式如下:

\[ \nabla^2 u + k^2 u = q \]

这里:

  • \(\nabla^2\) 是拉普拉斯算子(也称为拉普拉斯算符),在三维直角坐标系下,它的形式是:\(\nabla^2 = \frac{\partial^2 }{\partial x^2} + \frac{\partial^2 }{\partial y^2} + \frac{\partial^2 }{\partial z^2}\)
  • \(u\) 是待求解的函数,通常表示物理量的幅度,如电磁场、声压或量子波函数等。
  • \(k\) 是波数,定义为 \(k = \frac{2\pi}{\lambda}\),其中 \(\lambda\) 是波长。
  • \(q\) 是源项,通常表示物理量与时间、空间导数之间的相互作用。

本案例解决以下三维 Helmholtz 方程:

\[ \begin{aligned} & \nabla^2 u + k^2 u = q, x \in \Omega \\ & u(x) = 0, x \in \partial \Omega \\ \end{aligned} \]
\[ \begin{aligned} & \text{source term } q = -(a_1 \pi)^2 u -(a_2 \pi)^2 u -(a_3 \pi)^2 u + k^2 u \\ & \text{where }k=1, a_1=4, a_2=4, a_3=3 \end{aligned} \]

2. 问题定义

本问题的计算域在 \([-1, 1] ^3\) 一个单位正方体内,对于计算域内部点,要求满足上述 Helmholtz 方程,对于计算域边界点,要求满足 \(u = 0\)

3. 问题求解

接下来开始讲解如何将问题一步一步地转化为 PaddleScience 代码,用深度学习的方法求解该问题。 为了快速理解 PaddleScience,接下来仅对模型构建、方程构建、计算域构建等关键步骤进行阐述,而其余细节请参考 API文档

3.1 模型构建

SPINN 的模型结构设计如下:

SPINN_structure

在 Helmholtz 问题中,每一个已知的坐标点 \((x, y, z)\) 都有对应的待求解的未知量 \(u\)(此处我们用 \(u\)代替),在这里使用 SPINN 来表示 \((x, y, z)\)\((u)\) 的映射函数 \(f: \mathbb{R}^3 \to \mathbb{R}^1\) ,即:

\[ u = m(x, y, z) \]

上式中 \(m\) 即为 SPINN 模型本身,用 PaddleScience 代码表示如下

# set model
model = ppsci.arch.SPINN(**cfg.MODEL)

为了在计算时,准确快速地访问具体变量的值,在这里指定网络模型的输入变量名是 ("x", "y", "z"),输出变量名是 ("u"),这些命名与后续代码保持一致。

接着通过指定 SPINN 的层数、神经元个数,就实例化出了一个拥有 4 层全连接层,每层全连接层的神经元个数为 64 ,每一个输出变量的隐层特征维度 r 为 32 的神经网络模型 model, 并且使用 tanh 作为激活函数。

# model settings
MODEL:
  input_keys: ["x", "y", "z"]
  output_keys: ["u"]
  num_layers: 4
  hidden_size: 64
  r: 32
  activation: "tanh"

3.2 方程构建

Helmholtz 微分方程可以用如下代码表示:

# set equation
equation = {"Helmholtz": ppsci.equation.Helmholtz(3, 1.0, model)}

注:此处我们需要把 model 手动传递给 equation["Helmholtz"],因为 Helmholtz 方程需要用到前向微分功能。

3.3 约束构建

3.3.1 内部点约束

以作用在内部点上的 SupervisedConstraint 为例,用于生成内部点训练数据的代码如下:

def _helmholtz3d_exact_u(a1, a2, a3, x, y, z):
    return np.sin(a1 * np.pi * x) * np.sin(a2 * np.pi * y) * np.sin(a3 * np.pi * z)


def _helmholtz3d_source_term(a1, a2, a3, x, y, z, lda=1.0):
    u_gt = _helmholtz3d_exact_u(a1, a2, a3, x, y, z)[..., None]
    uxx = -((a1 * np.pi) ** 2) * u_gt
    uyy = -((a2 * np.pi) ** 2) * u_gt
    uzz = -((a3 * np.pi) ** 2) * u_gt
    return uxx + uyy + uzz + lda * u_gt


def generate_train_helmholtz3d(a1, a2, a3, nc):
    xc = np.random.uniform(-1.0, 1.0, [nc, 1]).astype(dtype)
    yc = np.random.uniform(-1.0, 1.0, [nc, 1]).astype(dtype)
    zc = np.random.uniform(-1.0, 1.0, [nc, 1]).astype(dtype)
    # source term
    xcm, ycm, zcm = np.meshgrid(xc, yc, zc, indexing="ij")
    uc = _helmholtz3d_source_term(a1, a2, a3, xcm, ycm, zcm).astype(dtype)
    # boundary (hard-coded)
    xb = [
        np.asarray([[1.0]], dtype=dtype),
        np.asarray([[-1.0]], dtype=dtype),
        xc,
        xc,
        xc,
        xc,
    ]
    yb = [
        yc,
        yc,
        np.asarray([[1.0]], dtype=dtype),
        np.asarray([[-1.0]], dtype=dtype),
        yc,
        yc,
    ]
    zb = [
        zc,
        zc,
        zc,
        zc,
        np.asarray([[1.0]], dtype=dtype),
        np.asarray([[-1.0]], dtype=dtype),
    ]
    return xc, yc, zc, uc, xb, yb, zb

用于构建内部点约束的代码如下:

class InteriorDataGenerator:
    def __init__(self):
        self.iter = 0
        self._gen()

    def _gen(self):
        global xb, yb, zb
        xc, yc, zc, uc, xb, yb, zb = generate_train_helmholtz3d(
            cfg.a1,
            cfg.a2,
            cfg.a3,
            cfg.TRAIN.nc,
        )
        self.xc = xc
        self.yc = yc
        self.zc = zc
        self.uc = uc

    def __call__(self):
        self.iter += 1

        if self.iter % 100 == 0:
            self._gen()

        return {
            "x": self.xc,
            "y": self.yc,
            "z": self.zc,
            "uc": self.uc,
        }

def gen_label_batch(input_batch):
    return {"helmholtz": input_batch["uc"]}

pde_constraint = ppsci.constraint.SupervisedConstraint(
    {
        "dataset": {
            "name": "ContinuousNamedArrayDataset",
            "input": InteriorDataGenerator(),
            "label": gen_label_batch,
        },
    },
    output_expr=equation["Helmholtz"].equations,
    loss=ppsci.loss.MSELoss("mean"),
    name="PDE",
)
# wrap constraints together
constraint = {
    pde_constraint.name: pde_constraint,
}

SupervisedConstraint 的第一个参数是用于训练的数据配置,由于我们使用实时随机生成的数据,而不是固定数据点,因此填入自定义的输入数据/标签生成函数;

第二个参数是方程表达式,因此传入 Helmholtz 的方程对象;

第三个参数是损失函数,此处选用 MSELoss 即可;

第四个参数是约束条件的名字,需要给每一个约束条件命名,方便后续对其索引。此处命名为 "PDE" 即可。

3.3.2 边值约束

第三个约束条件是边值约束,代码如下:

    def __init__(self, idx: int):
        self.idx = idx

    def __call__(self):
        global xb, yb, zb
        tmp = {
            "x": xb[self.idx],
            "y": yb[self.idx],
            "z": zb[self.idx],
        }
        return tmp

def gen_bc_label(data_dict):
    nx = len(data_dict["x"])
    ny = len(data_dict["y"])
    nz = len(data_dict["z"])
    return {"u": np.zeros([nx, ny, nz, 1], dtype=dtype)}

for i in range(6):
    bc_constraint_i = ppsci.constraint.SupervisedConstraint(
        {
            "dataset": {
                "name": "ContinuousNamedArrayDataset",
                "input": BCDataGenerator(i),
                "label": gen_bc_label,
            },
        },
        output_expr={"u": lambda out: out["u"]},
        loss=ppsci.loss.MSELoss("mean"),
        name=f"BC{i}",
    )
    constraint[bc_constraint_i.name] = bc_constraint_i

3.4 超参数设定

接下来需要指定训练轮数和学习率,此处按实验经验,使用 50 轮训练轮数,每轮迭代 1000 步,0.001 的初始学习率。

# training settings
TRAIN:
  epochs: 50
  iters_per_epoch: 1000
  save_freq: 10
  eval_during_train: false
  eval_freq: 5
  lr_scheduler:
    epochs: ${TRAIN.epochs}
    iters_per_epoch: ${TRAIN.iters_per_epoch}
    learning_rate: 1.0e-3
    gamma: 0.9
    decay_steps: 1000
    by_epoch: false
  nc: 64
  pretrained_model_path: null
  checkpoint_path: null

3.5 优化器构建

训练过程会调用优化器来更新模型参数,此处选择较为常用的 Adam 优化器,并配合使用机器学习中常用的 ExponentialDecay 学习率调整策略。

lr_scheduler = ppsci.optimizer.lr_scheduler.ExponentialDecay(
    **cfg.TRAIN.lr_scheduler
)()
optimizer = ppsci.optimizer.Adam(lr_scheduler)(model)

3.6 模型训练、评估与可视化

完成上述设置之后,只需要将上述实例化的对象按顺序传递给 ppsci.solver.Solver,然后启动训练、评估、可视化。

solver = ppsci.solver.Solver(
    model,
    constraint,
    optimizer=optimizer,
    equation=equation,
    cfg=cfg,
)
# train model
solver.train()

# evaluate after training
x, y, z, u_gt = generate_test_helmholtz3d(cfg.a1, cfg.a2, cfg.a3, cfg.EVAL.nc)
u_pred = solver.predict(
    {
        "x": x,
        "y": y,
        "z": z,
    },
    batch_size=None,
    return_numpy=True,
)["u"].reshape(-1)
u_gt = u_gt.reshape(-1)
l2_err = np.linalg.norm(u_pred - u_gt, ord=2) / np.linalg.norm(u_gt, ord=2)
rmse = np.sqrt(np.mean((u_pred - u_gt) ** 2))
logger.message(f"l2_err = {l2_err:.4f}, rmse = {rmse:.4f}")

save_result(
    osp.join(cfg.output_dir, "helmholtz3d_result.vtu"), x, y, z, u_pred, u_gt
)

4. 完整代码

helmholtz3d.py
"""
Reference: https://github.com/stnamjef/SPINN/blob/main/helmholtz3d.py
"""

from os import path as osp

import hydra
import numpy as np
import paddle
from omegaconf import DictConfig

import ppsci
from ppsci.utils import logger

dtype = paddle.get_default_dtype()


def save_result(filename, x, y, z, u_pred, u_ref):
    xm, ym, zm = np.meshgrid(x, y, z, indexing="ij")
    xm = xm.reshape(-1, 1)
    ym = ym.reshape(-1, 1)
    zm = zm.reshape(-1, 1)
    u_pred = u_pred.reshape(-1, 1)
    u_ref = u_ref.reshape(-1, 1)
    ppsci.visualize.save_vtu_from_dict(
        filename,
        {
            "x": xm,
            "y": ym,
            "z": zm,
            "u_pred": u_pred,
            "u_ref": u_ref,
        },
        ("x", "y", "z"),
        ("u_pred", "u_ref"),
    )


def _helmholtz3d_exact_u(a1, a2, a3, x, y, z):
    return np.sin(a1 * np.pi * x) * np.sin(a2 * np.pi * y) * np.sin(a3 * np.pi * z)


def _helmholtz3d_source_term(a1, a2, a3, x, y, z, lda=1.0):
    u_gt = _helmholtz3d_exact_u(a1, a2, a3, x, y, z)[..., None]
    uxx = -((a1 * np.pi) ** 2) * u_gt
    uyy = -((a2 * np.pi) ** 2) * u_gt
    uzz = -((a3 * np.pi) ** 2) * u_gt
    return uxx + uyy + uzz + lda * u_gt


def generate_train_helmholtz3d(a1, a2, a3, nc):
    xc = np.random.uniform(-1.0, 1.0, [nc, 1]).astype(dtype)
    yc = np.random.uniform(-1.0, 1.0, [nc, 1]).astype(dtype)
    zc = np.random.uniform(-1.0, 1.0, [nc, 1]).astype(dtype)
    # source term
    xcm, ycm, zcm = np.meshgrid(xc, yc, zc, indexing="ij")
    uc = _helmholtz3d_source_term(a1, a2, a3, xcm, ycm, zcm).astype(dtype)
    # boundary (hard-coded)
    xb = [
        np.asarray([[1.0]], dtype=dtype),
        np.asarray([[-1.0]], dtype=dtype),
        xc,
        xc,
        xc,
        xc,
    ]
    yb = [
        yc,
        yc,
        np.asarray([[1.0]], dtype=dtype),
        np.asarray([[-1.0]], dtype=dtype),
        yc,
        yc,
    ]
    zb = [
        zc,
        zc,
        zc,
        zc,
        np.asarray([[1.0]], dtype=dtype),
        np.asarray([[-1.0]], dtype=dtype),
    ]
    return xc, yc, zc, uc, xb, yb, zb


def generate_test_helmholtz3d(a1, a2, a3, nc_test):
    x = np.linspace(-1.0, 1.0, nc_test, dtype=dtype)
    y = np.linspace(-1.0, 1.0, nc_test, dtype=dtype)
    z = np.linspace(-1.0, 1.0, nc_test, dtype=dtype)
    xm, ym, zm = np.meshgrid(x, y, z, indexing="ij")
    u_gt = _helmholtz3d_exact_u(a1, a2, a3, xm, ym, zm).astype(dtype)[..., None]
    x = x.reshape(-1, 1)
    y = y.reshape(-1, 1)
    z = z.reshape(-1, 1)
    return x, y, z, u_gt


def train(cfg: DictConfig):
    # set model
    model = ppsci.arch.SPINN(**cfg.MODEL)

    # set equation
    equation = {"Helmholtz": ppsci.equation.Helmholtz(3, 1.0, model)}

    # set constraint
    class InteriorDataGenerator:
        def __init__(self):
            self.iter = 0
            self._gen()

        def _gen(self):
            global xb, yb, zb
            xc, yc, zc, uc, xb, yb, zb = generate_train_helmholtz3d(
                cfg.a1,
                cfg.a2,
                cfg.a3,
                cfg.TRAIN.nc,
            )
            self.xc = xc
            self.yc = yc
            self.zc = zc
            self.uc = uc

        def __call__(self):
            self.iter += 1

            if self.iter % 100 == 0:
                self._gen()

            return {
                "x": self.xc,
                "y": self.yc,
                "z": self.zc,
                "uc": self.uc,
            }

    def gen_label_batch(input_batch):
        return {"helmholtz": input_batch["uc"]}

    pde_constraint = ppsci.constraint.SupervisedConstraint(
        {
            "dataset": {
                "name": "ContinuousNamedArrayDataset",
                "input": InteriorDataGenerator(),
                "label": gen_label_batch,
            },
        },
        output_expr=equation["Helmholtz"].equations,
        loss=ppsci.loss.MSELoss("mean"),
        name="PDE",
    )
    # wrap constraints together
    constraint = {
        pde_constraint.name: pde_constraint,
    }

    class BCDataGenerator:
        def __init__(self, idx: int):
            self.idx = idx

        def __call__(self):
            global xb, yb, zb
            tmp = {
                "x": xb[self.idx],
                "y": yb[self.idx],
                "z": zb[self.idx],
            }
            return tmp

    def gen_bc_label(data_dict):
        nx = len(data_dict["x"])
        ny = len(data_dict["y"])
        nz = len(data_dict["z"])
        return {"u": np.zeros([nx, ny, nz, 1], dtype=dtype)}

    for i in range(6):
        bc_constraint_i = ppsci.constraint.SupervisedConstraint(
            {
                "dataset": {
                    "name": "ContinuousNamedArrayDataset",
                    "input": BCDataGenerator(i),
                    "label": gen_bc_label,
                },
            },
            output_expr={"u": lambda out: out["u"]},
            loss=ppsci.loss.MSELoss("mean"),
            name=f"BC{i}",
        )
        constraint[bc_constraint_i.name] = bc_constraint_i

    # set optimizer
    lr_scheduler = ppsci.optimizer.lr_scheduler.ExponentialDecay(
        **cfg.TRAIN.lr_scheduler
    )()
    optimizer = ppsci.optimizer.Adam(lr_scheduler)(model)

    # initialize solver
    solver = ppsci.solver.Solver(
        model,
        constraint,
        optimizer=optimizer,
        equation=equation,
        cfg=cfg,
    )
    # train model
    solver.train()

    # evaluate after training
    x, y, z, u_gt = generate_test_helmholtz3d(cfg.a1, cfg.a2, cfg.a3, cfg.EVAL.nc)
    u_pred = solver.predict(
        {
            "x": x,
            "y": y,
            "z": z,
        },
        batch_size=None,
        return_numpy=True,
    )["u"].reshape(-1)
    u_gt = u_gt.reshape(-1)
    l2_err = np.linalg.norm(u_pred - u_gt, ord=2) / np.linalg.norm(u_gt, ord=2)
    rmse = np.sqrt(np.mean((u_pred - u_gt) ** 2))
    logger.message(f"l2_err = {l2_err:.4f}, rmse = {rmse:.4f}")

    save_result(
        osp.join(cfg.output_dir, "helmholtz3d_result.vtu"), x, y, z, u_pred, u_gt
    )


def evaluate(cfg: DictConfig):
    # set model
    model = ppsci.arch.SPINN(**cfg.MODEL)

    solver = ppsci.solver.Solver(
        model,
        cfg=cfg,
    )

    # evaluate
    x, y, z, u_gt = generate_test_helmholtz3d(cfg.a1, cfg.a2, cfg.a3, cfg.EVAL.nc)
    u_pred = solver.predict(
        {
            "x": x,
            "y": y,
            "z": z,
        },
        batch_size=None,
        return_numpy=True,
    )["u"].reshape(-1)
    u_gt = u_gt.reshape(-1)
    l2_err = np.linalg.norm(u_pred - u_gt, ord=2) / np.linalg.norm(u_gt, ord=2)
    rmse = np.sqrt(np.mean((u_pred - u_gt) ** 2))
    logger.message(f"l2_err = {l2_err:.4f}, rmse = {rmse:.4f}")

    save_result(
        osp.join(cfg.output_dir, "helmholtz3d_result.vtu"), x, y, z, u_pred, u_gt
    )


def export(cfg: DictConfig):
    # set model
    model = ppsci.arch.SPINN(**cfg.MODEL)

    # initialize solver
    solver = ppsci.solver.Solver(model, cfg=cfg)
    # export model
    from paddle.static import InputSpec

    input_spec = [
        {key: InputSpec([None, 1], "float32", name=key) for key in model.input_keys},
    ]
    solver.export(input_spec, cfg.INFER.export_path, with_onnx=False)


def inference(cfg: DictConfig):
    from deploy.python_infer import pinn_predictor

    predictor = pinn_predictor.PINNPredictor(cfg)

    # evaluate
    x, y, z, u_gt = generate_test_helmholtz3d(cfg.a1, cfg.a2, cfg.a3, cfg.EVAL.nc)
    output_dict = predictor.predict(
        {
            "x": x,
            "y": y,
            "z": z,
        },
        batch_size=None,
    )
    # mapping data to cfg.INFER.output_keys
    output_dict = {
        store_key: output_dict[infer_key]
        for store_key, infer_key in zip(cfg.MODEL.output_keys, output_dict.keys())
    }
    u_pred = output_dict["u"].reshape(-1)
    u_gt = u_gt.reshape(-1)
    l2_err = np.linalg.norm(u_pred - u_gt, ord=2) / np.linalg.norm(u_gt, ord=2)
    rmse = np.sqrt(np.mean((u_pred - u_gt) ** 2))
    logger.message(f"l2_err = {l2_err:.4f}, rmse = {rmse:.4f}")

    save_result(
        osp.join(cfg.output_dir, "helmholtz3d_result.vtu"), x, y, z, u_pred, u_gt
    )


@hydra.main(version_base=None, config_path="./conf", config_name="helmholtz3d.yaml")
def main(cfg: DictConfig):
    if cfg.mode == "train":
        train(cfg)
    elif cfg.mode == "eval":
        evaluate(cfg)
    elif cfg.mode == "export":
        export(cfg)
    elif cfg.mode == "infer":
        inference(cfg)
    else:
        raise ValueError(
            f"cfg.mode should in ['train', 'eval', 'export', 'infer'], but got '{cfg.mode}'"
        )


if __name__ == "__main__":
    main()

5. 结果展示

在计算域上均匀采样出 \(100^3\) 个点,其预测结果和解析解如下图所示。

spinn_helmholtz3d.jpg

左侧为 PaddleScience 预测结果,右侧为解析解结果

本问题使用模型预测的误差为 l2_err = 0.0183,rmse = 0.0064,与解析解误差较小,基本一致。

6. 参考资料