跳转至

CVit(Navier-Stokes)

AI Studio快速体验

# download data
git lfs install
git clone https://huggingface.co/datasets/pdearena/NavierStokes-2D
python ns_cvit.py
# download data
git lfs install
git clone https://huggingface.co/datasets/pdearena/NavierStokes-2D
python ns_cvit.py mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/cvit/ns_cvit_pretrained.pdparams
pip install einops==0.8.1 # Check if einops has been updated to 0.8.1, see: https://github.com/arogozhnikov/einops/pull/353
python ns_cvit.py mode=export
pip install einops==0.8.1 # Check if einops has been updated to 0.8.1, see: https://github.com/arogozhnikov/einops/pull/353
# download data
git lfs install
git clone https://huggingface.co/datasets/pdearena/NavierStokes-2D
python ns_cvit.py mode=infer
预训练模型 指标
ns_cvit_pretrained.pdparams 4-step l2_error: 0.0396

1. 背景简介

sciml 领域所使用的模型现阶段与CV、NLP领域的先进模型有较大差别,并没有很好地利用好这些先进模型所提供的优势。因此论文作者首先提出了一个算子学习的统一视角,按照 Global conditioning 和 Local Conditioning 分别对 DeepONet、FNO、GNO 等模型进行了归纳与总结,然后基于目前广泛应用于CV、NLP领域的 Transformer 结构设计了一种 Global conditioning 的模型 CVit。相比以往的算子学习模型,参数量更小,精度更高。

模型结构如下图所示:

Cvit

2. 问题定义

CVit 作为一种算子学习模型,以输入函数 \(u\)、函数 \(s\) 的查询点 query coordinate \(y\) 为输入,输出经过算子映射后的函数,在查询点 \(y\) 处的函数值 \(s(y)\)

本问题基于固定方腔的不可压 buoyancy-driven flow 即方腔内的浮力驱动流动问题,求解如下方程:

Formulation We consider the vorticity-stream \((\omega-\psi)\) formulation of the incompressible Navier-Stokes equations on a two-dimensional periodic domain, \(D=D_u=D_v=[0,2 \pi]^2\) :

\[ \begin{aligned} & \frac{\partial \omega}{\partial t}+(v \cdot \nabla) \omega-v \Delta \omega=f^{\prime} \\ & \omega=-\Delta \psi \quad \int_D \psi=0, \\ & v=\left(\frac{\partial \psi}{\partial x_2},-\frac{\partial \psi}{\partial x_1}\right) \end{aligned} \]

3. 问题求解

接下来开始讲解如何将问题一步一步地转化为 PaddleScience 代码,用深度学习的方法求解该问题。 为了快速理解 PaddleScience,接下来仅对模型构建、方程构建、计算域构建等关键步骤进行阐述,而其余细节请参考 API文档

3.1 模型构建

在本问题中,对于每一个函数 \(u\),其经过算子学习模型映射到 \(s\) 后,在 \(y\) 上都有对应的标签 \(s(y)\),因此在这里使用 CVit 来表示 \((u, y)\)\(s(y)\) 的映射关系:

\[ s(y) = G(u)(y) \]

上式中 \(G(u)\) 即为 CVit 模型本身,用 PaddleScience 代码表示如下

# set model
model = ppsci.arch.CVit(**cfg.MODEL)

为了在计算时,准确快速地访问具体变量的值,在这里指定网络模型的输入变量名是 ("u", "y"),输出变量名是 ("s"),这些命名与后续代码保持一致。

接着通过指定 CVit 的输入维度、坐标维度、输出维度、模型层数等超参数,就可实例化出了一个 model

# model settings
MODEL:
  input_keys: [u, y]
  output_keys: [s]
  in_dim: 3
  coords_dim: 2
  spatial_dims: [10, 128, 128] # t, h, w
  grid_size: [128, 128]
  latent_dim: 512
  emb_dim: 384
  patch_size: [1, 8, 8]
  depth: 5
  num_heads: 6
  dec_emb_dim: 512
  dec_num_heads: 16
  dec_depth: 1
  num_mlp_layers: 1
  mlp_ratio: 1
  out_dim: 3
  embedding_type: grid

3.2 数据准备

本问题中的数据分片储存在 NavierStokes2D/*.h5 文件中,分为训练和测试集,其数据包含内容如下表所示(这些信息会在运行时打印出来)。

文件名 文件数量 数据形状 输入形状 标签形状
NavierStokes2D_train_*.h5 52 [1000, 14, 128, 128, 3] [4000, 10, 128, 128, 3] [4000, 1, 128, 128, 3]
NavierStokes2D_test_*.h5 41 [5200, 14, 128, 128, 3] [20800, 10, 128, 128, 3] [20800, 1, 128, 128, 3]

数据读取函数如下:

# Construct the full dataset
def prepare_ns_dataset(
    directory: str,
    mode: str,
    keys: Sequence[str],
    prev_steps: int,
    pred_steps: int,
    num_samples: int,
    downsample: int = 1,
):
    # Use list comprehension for efficiency
    file_names = [
        osp.join(directory, f)
        for f in os.listdir(directory)
        if re.match(f"^NavierStokes2D_{mode}", f)
    ]

    # Initialize dictionaries to hold the inputs and outputs
    data_dict = {key: [] for key in keys}

    num_files = len(file_names)

    f = h5py.File(file_names[0], "r")
    s = f[mode][keys[0]].shape[0]
    for i in tqdm.trange(min(num_files, num_samples // s + 1), desc="Reading files"):
        with h5py.File(file_names[i], "r") as f:
            data_group = f[mode]

            for key in keys:
                # Use memory-mapping to reduce memory usage
                data_dict[key].append(np.array(data_group[key], dtype=dtype))

    for key in keys:
        data_dict[key] = np.vstack(data_dict[key])

    data = np.concatenate(
        [np.expand_dims(arr, axis=-1) for arr in data_dict.values()], axis=-1
    )
    data = data[:num_samples, :, ::downsample, ::downsample, :]

    # Use sliding window to generate inputs and outputs
    sliding_data = sliding_window_view(
        data, window_shape=prev_steps + pred_steps, axis=1
    )
    sliding_data = einops.rearrange(sliding_data, "n m h w c s -> (n m) s h w c")

    inputs = sliding_data[:, :prev_steps, ...]
    outputs = sliding_data[:, prev_steps : prev_steps + pred_steps, ...]

    return inputs, outputs  # (B, T, H, W, C) (B, T', H, W, C)

训练、测试时采用前 10 个时刻预测下一个时刻,并且测试时会以自回归的形式连续预测 4 个时刻。

3.3 约束构建

3.3.1 监督约束

在训练时,随机选取 batch_size 组来自 \(u\) 上的数据、并同时随机选取 query_point\(y\) 坐标,如此构成了训练输入数据,标签数据则从 \(s\) 中随机选取同样的 batch_size x query_point 个标签点。

# set constraint
def random_query(
    input_dict: Dict[str, np.ndarray],
    label_dict: Dict[str, np.ndarray],
    weight_dict: Dict[str, np.ndarray],
) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]:
    y_key = cfg.MODEL.input_keys[1]
    s_key = cfg.MODEL.output_keys[0]
    # random select coords and labels
    npos = input_dict[y_key].shape[1]
    assert cfg.TRAIN.num_query_points <= npos, (
        f"Number of query points({cfg.TRAIN.num_query_points}) must be "
        f"less than or equal to number of positions({npos})."
    )
    random_pos = np.random.choice(npos, cfg.TRAIN.num_query_points, replace=False)
    input_dict[y_key] = input_dict[y_key][0, random_pos]
    label_dict[s_key] = label_dict[s_key][:, random_pos]
    return (input_dict, label_dict, weight_dict)

sup_constraint = ppsci.constraint.SupervisedConstraint(
    {
        "dataset": {
            "name": "NamedArrayDataset",
            "input": {"u": train_inputs, "y": train_coords},
            "label": {"s": train_outputs},
            "transforms": [
                {
                    "FunctionalTransform": {
                        "transform_func": random_query,
                    },
                },
            ],
        },
        "batch_size": cfg.TRAIN.batch_size,
        "auto_collation": False,  # NOTE: Explicitly disable auto collation
    },
    output_expr={"s": lambda out: out["s"]},
    loss=ppsci.loss.MSELoss("mean"),
    name="Sup",
)
# wrap constraints together
constraint = {sup_constraint.name: sup_constraint}

SupervisedConstraint 的第一个参数是用于训练的数据配置,我们使用 NamedArrayDataset 作为数据集类型,并且传入自定义的random_query作为transforms,完成上述的样本随机选取过程;

第二个参数是该约束的计算表达式,我们只需要计算 \(s\) 即可,因此填入一个不做任何处理,直接取出模型输出结果"s"的匿名表达式;

第三个参数是损失函数,此处选用 MSELoss 函数;

第四个参数是约束条件的名字,需要给每一个约束条件命名,方便后续对其索引。此处命名为 "Sup" 即可。

3.4 超参数设定

接下来需要指定训练轮数和学习率,此处按实验经验,使用 200 轮训练轮数,初始学习率为 0.001,预热轮数为 5,全局梯度裁剪系数为 1.0,权重衰减为 1e-5。

# training settings
TRAIN:
  epochs: 200
  iters_per_epoch: 1000
  save_freq: 10
  eval_during_train: true
  eval_freq: 1
  lr_scheduler:
    epochs: ${TRAIN.epochs}
    iters_per_epoch: ${TRAIN.iters_per_epoch}
    learning_rate: 1.0e-3
    gamma: 0.9
    decay_steps: 5000
    by_epoch: false
    warmup_epoch: 5
    warmup_start_lr: 0.0
  weight_decay: 1.0e-5
  grad_clip: 1.0
  batch_size: 64

3.5 优化器构建

训练过程会调用优化器来更新模型参数,此处选择较为常用的 Adam 优化器,并配合使用机器学习中常用的 ExponentialDecay 学习率调整策略。

# set optimizer
lr_scheduler = ppsci.optimizer.lr_scheduler.ExponentialDecay(
    **cfg.TRAIN.lr_scheduler
)()
optimizer = ppsci.optimizer.AdamW(
    lr_scheduler,
    weight_decay=cfg.TRAIN.weight_decay,
    grad_clip=paddle.nn.ClipGradByGlobalNorm(cfg.TRAIN.grad_clip),
)(model)

3.6 评估器构建

在训练过程中通常会按一定轮数间隔,用验证集(测试集)评估当前模型的训练情况,因此使用 ppsci.validate.SupervisedValidator 构建评估器。

# set validator
test_inputs, test_outputs = prepare_ns_dataset(
    cfg.DATA.path,
    "test",
    cfg.DATA.components,
    cfg.DATA.prev_steps,
    cfg.DATA.pred_steps,
    cfg.EVAL.test_samples,
    cfg.DATA.downsample,
)
print("testing input ", test_inputs.shape, "testing label", test_outputs.shape)
test_outputs = einops.rearrange(test_outputs, "b t h w c -> b (t h w) c")
h, w = test_inputs.shape[2:4]
x_star = np.linspace(0, 1, h, dtype=dtype)
y_star = np.linspace(0, 1, w, dtype=dtype)
x_star, y_star = np.meshgrid(x_star, y_star, indexing="ij")
test_coords = np.hstack([x_star.flatten()[:, None], y_star.flatten()[:, None]])
test_coords = np.broadcast_to(
    test_coords[None, :], [len(test_inputs), test_outputs.shape[1], 2]
)

def l2_err_func(
    output_dict: Dict[str, np.ndarray],
    label_dict: Dict[str, np.ndarray],
) -> paddle.Tensor:
    s_key = cfg.MODEL.output_keys[0]
    l2_error = (
        (output_dict[s_key] - label_dict[s_key]).norm(axis=1)
        / label_dict[s_key].norm(axis=1)
    ).mean()  # average along batch and channels
    return {"s_l2_err": l2_error}

s_validator = ppsci.validate.SupervisedValidator(
    {
        "dataset": {
            "name": "NamedArrayDataset",
            "input": {"u": test_inputs, "y": test_coords},
            "label": {"s": test_outputs},
        },
        "batch_size": cfg.EVAL.batch_size,
    },
    loss=ppsci.loss.MSELoss("mean"),
    metric={"s": ppsci.metric.FunctionalMetric(l2_err_func)},
    name="s_validator",
)
validator = {s_validator.name: s_validator}

过程中我们使用了自定义的评估函数 l2_err_func,用于评估测试集上所有样本、三个输出物理量的 2-范数误差。

3.7 模型训练、评估

完成上述设置之后,只需要将上述实例化的对象按顺序传递给 ppsci.solver.Solver,然后启动训练、评估。

# initialize solver
solver = ppsci.solver.Solver(
    model,
    constraint,
    validator=validator,
    optimizer=optimizer,
    cfg=cfg,
)
# train model
solver.train()

4. 完整代码

ns_cvit.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
"""
Reference: https://github.com/PredictiveIntelligenceLab/cvit/tree/main/ns/
"""

import os
import re
from os import path as osp
from typing import Dict
from typing import Sequence
from typing import Tuple

import einops
import h5py
import hydra
import numpy as np
import paddle
import tqdm
from matplotlib import pyplot as plt
from numpy.lib.stride_tricks import sliding_window_view
from omegaconf import DictConfig

import ppsci

dtype = paddle.get_default_dtype()


# Construct the full dataset
def prepare_ns_dataset(
    directory: str,
    mode: str,
    keys: Sequence[str],
    prev_steps: int,
    pred_steps: int,
    num_samples: int,
    downsample: int = 1,
):
    # Use list comprehension for efficiency
    file_names = [
        osp.join(directory, f)
        for f in os.listdir(directory)
        if re.match(f"^NavierStokes2D_{mode}", f)
    ]

    # Initialize dictionaries to hold the inputs and outputs
    data_dict = {key: [] for key in keys}

    num_files = len(file_names)

    f = h5py.File(file_names[0], "r")
    s = f[mode][keys[0]].shape[0]
    for i in tqdm.trange(min(num_files, num_samples // s + 1), desc="Reading files"):
        with h5py.File(file_names[i], "r") as f:
            data_group = f[mode]

            for key in keys:
                # Use memory-mapping to reduce memory usage
                data_dict[key].append(np.array(data_group[key], dtype=dtype))

    for key in keys:
        data_dict[key] = np.vstack(data_dict[key])

    data = np.concatenate(
        [np.expand_dims(arr, axis=-1) for arr in data_dict.values()], axis=-1
    )
    data = data[:num_samples, :, ::downsample, ::downsample, :]

    # Use sliding window to generate inputs and outputs
    sliding_data = sliding_window_view(
        data, window_shape=prev_steps + pred_steps, axis=1
    )
    sliding_data = einops.rearrange(sliding_data, "n m h w c s -> (n m) s h w c")

    inputs = sliding_data[:, :prev_steps, ...]
    outputs = sliding_data[:, prev_steps : prev_steps + pred_steps, ...]

    return inputs, outputs  # (B, T, H, W, C) (B, T', H, W, C)


def train(cfg: DictConfig):
    # set model
    model = ppsci.arch.CVit(**cfg.MODEL)

    # prepare training data
    train_inputs, train_outputs = prepare_ns_dataset(
        cfg.DATA.path,
        "train",
        cfg.DATA.components,
        cfg.DATA.prev_steps,
        cfg.DATA.pred_steps,
        cfg.TRAIN.train_samples,
        cfg.DATA.downsample,
    )
    print("training input ", train_inputs.shape, "training label", train_outputs.shape)
    train_outputs = einops.rearrange(train_outputs, "b t h w c -> b (t h w) c")
    h, w = train_inputs.shape[2:4]
    x_star = np.linspace(0, 1, h, dtype=dtype)
    y_star = np.linspace(0, 1, w, dtype=dtype)
    x_star, y_star = np.meshgrid(x_star, y_star, indexing="ij")
    train_coords = np.hstack([x_star.flatten()[:, None], y_star.flatten()[:, None]])
    train_coords = np.broadcast_to(
        train_coords[None, :], [len(train_inputs), train_outputs.shape[1], 2]
    )

    # set constraint
    def random_query(
        input_dict: Dict[str, np.ndarray],
        label_dict: Dict[str, np.ndarray],
        weight_dict: Dict[str, np.ndarray],
    ) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, np.ndarray]]:
        y_key = cfg.MODEL.input_keys[1]
        s_key = cfg.MODEL.output_keys[0]
        # random select coords and labels
        npos = input_dict[y_key].shape[1]
        assert cfg.TRAIN.num_query_points <= npos, (
            f"Number of query points({cfg.TRAIN.num_query_points}) must be "
            f"less than or equal to number of positions({npos})."
        )
        random_pos = np.random.choice(npos, cfg.TRAIN.num_query_points, replace=False)
        input_dict[y_key] = input_dict[y_key][0, random_pos]
        label_dict[s_key] = label_dict[s_key][:, random_pos]
        return (input_dict, label_dict, weight_dict)

    sup_constraint = ppsci.constraint.SupervisedConstraint(
        {
            "dataset": {
                "name": "NamedArrayDataset",
                "input": {"u": train_inputs, "y": train_coords},
                "label": {"s": train_outputs},
                "transforms": [
                    {
                        "FunctionalTransform": {
                            "transform_func": random_query,
                        },
                    },
                ],
            },
            "batch_size": cfg.TRAIN.batch_size,
            "auto_collation": False,  # NOTE: Explicitly disable auto collation
        },
        output_expr={"s": lambda out: out["s"]},
        loss=ppsci.loss.MSELoss("mean"),
        name="Sup",
    )
    # wrap constraints together
    constraint = {sup_constraint.name: sup_constraint}

    # set optimizer
    lr_scheduler = ppsci.optimizer.lr_scheduler.ExponentialDecay(
        **cfg.TRAIN.lr_scheduler
    )()
    optimizer = ppsci.optimizer.AdamW(
        lr_scheduler,
        weight_decay=cfg.TRAIN.weight_decay,
        grad_clip=paddle.nn.ClipGradByGlobalNorm(cfg.TRAIN.grad_clip),
    )(model)

    # set validator
    test_inputs, test_outputs = prepare_ns_dataset(
        cfg.DATA.path,
        "test",
        cfg.DATA.components,
        cfg.DATA.prev_steps,
        cfg.DATA.pred_steps,
        cfg.EVAL.test_samples,
        cfg.DATA.downsample,
    )
    print("testing input ", test_inputs.shape, "testing label", test_outputs.shape)
    test_outputs = einops.rearrange(test_outputs, "b t h w c -> b (t h w) c")
    h, w = test_inputs.shape[2:4]
    x_star = np.linspace(0, 1, h, dtype=dtype)
    y_star = np.linspace(0, 1, w, dtype=dtype)
    x_star, y_star = np.meshgrid(x_star, y_star, indexing="ij")
    test_coords = np.hstack([x_star.flatten()[:, None], y_star.flatten()[:, None]])
    test_coords = np.broadcast_to(
        test_coords[None, :], [len(test_inputs), test_outputs.shape[1], 2]
    )

    def l2_err_func(
        output_dict: Dict[str, np.ndarray],
        label_dict: Dict[str, np.ndarray],
    ) -> paddle.Tensor:
        s_key = cfg.MODEL.output_keys[0]
        l2_error = (
            (output_dict[s_key] - label_dict[s_key]).norm(axis=1)
            / label_dict[s_key].norm(axis=1)
        ).mean()  # average along batch and channels
        return {"s_l2_err": l2_error}

    s_validator = ppsci.validate.SupervisedValidator(
        {
            "dataset": {
                "name": "NamedArrayDataset",
                "input": {"u": test_inputs, "y": test_coords},
                "label": {"s": test_outputs},
            },
            "batch_size": cfg.EVAL.batch_size,
        },
        loss=ppsci.loss.MSELoss("mean"),
        metric={"s": ppsci.metric.FunctionalMetric(l2_err_func)},
        name="s_validator",
    )
    validator = {s_validator.name: s_validator}

    # initialize solver
    solver = ppsci.solver.Solver(
        model,
        constraint,
        validator=validator,
        optimizer=optimizer,
        cfg=cfg,
    )
    # train model
    solver.train()


def evaluate(cfg: DictConfig):
    # set model
    model = ppsci.arch.CVit(**cfg.MODEL)

    # init validator
    test_inputs, test_outputs = prepare_ns_dataset(
        cfg.DATA.path,
        "test",
        cfg.DATA.components,
        cfg.DATA.prev_steps,
        cfg.DATA.pred_steps,
        cfg.EVAL.test_samples,
        cfg.DATA.downsample,
    )
    print("test data", test_inputs.shape, test_outputs.shape)
    test_outputs = einops.rearrange(test_outputs, "b t h w c -> b (t h w) c")
    h, w = test_inputs.shape[2:4]
    x_star = np.linspace(0, 1, h, dtype=dtype)
    y_star = np.linspace(0, 1, w, dtype=dtype)
    x_star, y_star = np.meshgrid(x_star, y_star, indexing="ij")
    test_coords = np.hstack([x_star.flatten()[:, None], y_star.flatten()[:, None]])
    test_coords = np.broadcast_to(
        test_coords[None, :], [len(test_inputs), test_outputs.shape[1], 2]
    )

    def l2_err_func(
        output_dict: Dict[str, np.ndarray],
        label_dict: Dict[str, np.ndarray],
    ) -> paddle.Tensor:
        s_key = cfg.MODEL.output_keys[0]
        l2_error = (
            (output_dict[s_key] - label_dict[s_key]).norm(axis=1)
            / label_dict[s_key].norm(axis=1)
        ).mean()  # average along batch and channels
        return {"s_l2_err": l2_error}

    s_validator = ppsci.validate.SupervisedValidator(
        {
            "dataset": {
                "name": "NamedArrayDataset",
                "input": {"u": test_inputs, "y": test_coords},
                "label": {"s": test_outputs},
            },
            "batch_size": cfg.EVAL.batch_size,
        },
        loss=ppsci.loss.MSELoss("mean"),
        metric={"s_err": ppsci.metric.FunctionalMetric(l2_err_func)},
        name="s_validator",
    )
    validator = {s_validator.name: s_validator}

    # initialize solver
    solver = ppsci.solver.Solver(
        model,
        validator=validator,
        cfg=cfg,
    )
    # train model
    solver.eval()


def export(cfg: DictConfig):
    # set model
    model = ppsci.arch.CVit(**cfg.MODEL)

    # initialize solver
    solver = ppsci.solver.Solver(model, cfg=cfg)
    # export model
    from paddle.static import InputSpec

    input_spec = [
        {
            model.input_keys[0]: InputSpec(
                [None, *cfg.MODEL.spatial_dims, cfg.MODEL.in_dim],
                "float32",
                name=model.input_keys[0],
            ),
            model.input_keys[1]: InputSpec(
                [None, cfg.MODEL.coords_dim], "float32", name=model.input_keys[1]
            ),
        },
    ]
    solver.export(
        input_spec, cfg.INFER.export_path, with_onnx=False, ignore_modules=[einops]
    )


def inference(cfg: DictConfig):
    from deploy.python_infer import pinn_predictor

    predictor = pinn_predictor.PINNPredictor(cfg)
    test_inputs, test_outputs = prepare_ns_dataset(
        cfg.DATA.path,
        "test",
        cfg.DATA.components,
        cfg.DATA.prev_steps,
        cfg.DATA.rollout_steps,
        cfg.INFER.test_samples,
        cfg.DATA.downsample,
    )
    print("test data", test_inputs.shape, test_outputs.shape)
    test_outputs = einops.rearrange(test_outputs, "b t h w c -> b (t h w) c")
    h, w = test_inputs.shape[2:4]
    x_star = np.linspace(0, 1, h, dtype=dtype)
    y_star = np.linspace(0, 1, w, dtype=dtype)
    x_star, y_star = np.meshgrid(x_star, y_star, indexing="ij")
    test_coords = np.hstack([x_star.flatten()[:, None], y_star.flatten()[:, None]])
    s_key = cfg.MODEL.output_keys[0]

    def rollout(x, coords, prev_steps=2, pred_steps=1, rollout_steps=5):
        b, _, h, w, c = x.shape
        pred_list = []
        for k in range(rollout_steps):
            input_dict = {"u": x, "y": coords}
            pred = predictor.predict(input_dict, batch_size=None)
            # mapping data to cfg.INFER.output_keys
            pred = {
                store_key: pred[infer_key]
                for store_key, infer_key in zip(cfg.MODEL.output_keys, pred.keys())
            }[s_key]
            pred = pred.reshape(b, pred_steps, h, w, c)
            pred_list.append(pred)

            # auto regression step
            x = np.concatenate([x, pred], axis=1)
            x = x[:, -prev_steps:]

        pred = np.concatenate(pred_list, axis=1)
        return pred

    l2_error_list = []
    for i in range(0, len(test_inputs), cfg.INFER.batch_size):
        st, ed = i, min(i + cfg.INFER.batch_size, len(test_inputs))
        pred = rollout(
            test_inputs[st:ed],
            test_coords,
            prev_steps=cfg.DATA.prev_steps,
            pred_steps=cfg.DATA.pred_steps,
            rollout_steps=cfg.DATA.rollout_steps,
        )
        pred = einops.rearrange(pred, "B T H W C-> B (T H W) C")
        y = test_outputs[st:ed]

        diff_norms = np.linalg.norm(pred - y, axis=1)
        y_norms = np.linalg.norm(y, axis=1)

        l2_error = (diff_norms / y_norms).mean(axis=1)
        l2_error_list.append(l2_error)

    l2_error = np.mean(np.array(l2_error_list))
    print(f"{cfg.INFER.rollout_steps}-step l2_error:", l2_error)

    # plot prediction of the first sample
    plt.rcParams.update(
        {
            # "text.usetex": True, # NOTE: This may cause error when using latex
            "font.family": "serif",
            "font.serif": ["Computer Modern Roman"],
            "font.size": 24,
        }
    )
    pred = einops.rearrange(
        pred, "B (T H W) C -> B T H W C", T=cfg.INFER.rollout_steps, W=w, H=h
    )
    y = einops.rearrange(
        y, "B (T H W) C -> B T H W C", T=cfg.INFER.rollout_steps, W=w, H=h
    )

    from mpl_toolkits.axes_grid1 import make_axes_locatable

    def plot(pred, ref, filename):
        fig, axes = plt.subplots(
            3,
            cfg.INFER.rollout_steps,
            figsize=((cfg.INFER.rollout_steps) * 5, 3 * 5),
            gridspec_kw={"width_ratios": [1, 1, 1, 1.2]},
        )

        # plot reference
        for t in range(cfg.INFER.rollout_steps):
            res = pred[t]
            im = axes[0, t].imshow(
                res, cmap="turbo", vmin=res.min(), vmax=res.max(), aspect="auto"
            )
            axes[0, t].set_yticks([])
            axes[0, t].xaxis.set_visible(False)
        axes[0, 0].set_ylabel("Reference", size="large", labelpad=20)
        divider = make_axes_locatable(axes[0, -1])
        cax = divider.append_axes("right", size="5%", pad=0.5)
        fig.colorbar(im, cax=cax)
        # plot prediction
        for t in range(cfg.INFER.rollout_steps):
            res = ref[t]
            im = axes[1, t].imshow(
                res, cmap="turbo", vmin=res.min(), vmax=res.max(), aspect="auto"
            )
            axes[1, t].set_yticks([])
            axes[1, t].xaxis.set_visible(False)
        axes[1, 0].set_ylabel("Prediction", size="large", labelpad=20)
        divider = make_axes_locatable(axes[1, -1])
        cax = divider.append_axes("right", size="5%", pad=0.5)
        fig.colorbar(im, cax=cax)
        # plot abs error
        for t in range(cfg.INFER.rollout_steps):
            res = pred[t] - ref[t]
            im = axes[2, t].imshow(
                res, cmap="turbo", vmin=res.min(), vmax=res.max(), aspect="auto"
            )
            axes[2, t].set_yticks([])
            axes[2, t].xaxis.set_visible(False)
        axes[2, 0].set_ylabel("Abs. Error", size="large", labelpad=20)
        divider = make_axes_locatable(axes[2, -1])
        cax = divider.append_axes("right", size="5%", pad=0.5)
        fig.colorbar(im, cax=cax)
        plt.tight_layout()
        plt.savefig(filename)
        plt.close()

    plot(pred[0, ..., 0], y[0, ..., 0], "./ns_u.png")
    plot(pred[0, ..., 1], y[0, ..., 1], "./ns_ux.png")
    plot(pred[0, ..., 2], y[0, ..., 2], "./ns_uy.png")


@hydra.main(
    version_base=None, config_path="./conf", config_name="ns_cvit_small_8x8.yaml"
)
def main(cfg: DictConfig):
    if cfg.mode == "train":
        train(cfg)
    elif cfg.mode == "eval":
        evaluate(cfg)
    elif cfg.mode == "export":
        export(cfg)
    elif cfg.mode == "infer":
        inference(cfg)
    else:
        raise ValueError(
            f"cfg.mode should in ['train', 'eval', 'export', 'infer'], but got '{cfg.mode}'"
        )


if __name__ == "__main__":
    main()

5. 结果展示

在测试集上的预测结果、参考结果以及绝对值误差如下图所示。

ns_u.jpg

左侧为 CVit 对物理量 u 的预测结果,中间为物理量 u 的参考结果,右侧为两者的差值

ns_ux.jpg

左侧为 CVit 对物理量 ux 的预测结果,中间为物理量 ux 的参考结果,右侧为两者的差值

ns_uy.jpg

左侧为 CVit 对物理量 uy 的预测结果,中间为物理量 uy 的参考结果,右侧为两者的差值

可以看到模型的三个预测物理量和参考结果基本一致,通过自回归的方式,连续推理 4 步的平均误差为 0.039%。

6. 参考资料