跳转至

FengWu

暂无

暂无

暂无

# Download sample input data
wget -c https://paddle-org.bj.bcebos.com/paddlescience/models/Fengwu/input1.npy -P ./data
wget -c https://paddle-org.bj.bcebos.com/paddlescience/models/Fengwu/input2.npy -P ./data

# Download pretrain model weight
wget -c https://paddle-org.bj.bcebos.com/paddlescience/models/Fengwu/fengwu_v2.onnx -P ./inference

# inference
python predict.py

1. 背景简介

随着近年来全球气候变化加剧,极端天气频发,各界对天气预报的时效和精度的期待更是与日俱增。如何提高天气预报的时效和准确度,一直是业内的重点课题。AI大模型“风乌”基于多模态和多任务深度学习方法构建,实现在高分辨率上对核心大气变量进行超过10天的有效预报,并在80%的评估指标上超越DeepMind发布的模型GraphCast。同时,“风乌”仅需30秒即可生成未来10天全球高精度预报结果,在效率上大幅优于传统模型。

2. 模型原理

本章节仅对风乌气象大模型的原理进行简单地介绍,详细的理论推导请阅读 FengWu: Pushing the Skillful Global Medium-range Weather Forecast beyond 10 Days Lead

模型的总体结构如图所示:

result

模型结构

模型将气候变量作为不同模态的输入。在 Modal-Customized Encoder 中将多个模态的特征进行编码,并使用基于 Transformer 的 Cross-modal Fuser 对编码后的特征进行融合,得到联合表示,最后在 Modal-Customized Decoder 中从联合表示中分别预测气候变量。

模型使用预训练权重推理,接下来将介绍模型的推理过程。

3. 模型构建

在该案例中,实现了 FengWuPredictor用于ONNX模型的推理:

examples/fengwu/predict.py
    # load mean and std data
    self.data_mean = np.load(cfg.INFER.mean_path)[:, np.newaxis, np.newaxis]
    self.data_std = np.load(cfg.INFER.std_path)[:, np.newaxis, np.newaxis]

def _preprocess_data(
    self, input_data_prev: np.ndarray, input_data_next: np.ndarray
) -> np.ndarray:
    input_data_prev_after_norm = (
        input_data_prev.astype("float32") - self.data_mean
    ) / self.data_std
    input_data_next_after_norm = (
        input_data_next.astype("float32") - self.data_mean
    ) / self.data_std
    input_data = np.concatenate(
        (input_data_prev_after_norm, input_data_next_after_norm), axis=0
    )[np.newaxis, :, :, :]
    input_data = input_data.astype(np.float32)

    return input_data

def predict(
    self,
    input_data_prev: np.ndarray,
    input_data_next: np.ndarray,
    batch_size: int = 1,
) -> List[np.ndarray]:
    """Predicts the output of the yinglong model for the given input.

    Args:
        input_data_prev(np.ndarray): Atomospheric data at the first time moment.
        input_data_next(np.ndarray): Atmospheric data six later.
        batch_size (int, optional): Batch size, now only support 1. Defaults to 1.

    Returns:
        List[np.ndarray]: Prediction for next 56 hours.
    """
    if batch_size != 1:
        raise ValueError(
            f"FengWuPredictor only support batch_size=1, but got {batch_size}"
        )

    # process data
    input_data = self._preprocess_data(input_data_prev, input_data_next)

    output_data_list = []
    # prepare input dict
    for _ in range(self.PREDICT_TIMESTAMP):
        input_dict = {
            self.input_names[0]: input_data,
        }

        # run predictor
        output_data = self.predictor.run(None, input_dict)[0]
        input_data = np.concatenate(
            (
                input_data[:, self.NUM_ATMOSPHERIC_FEATURES :],
examples/fengwu/conf/fengwu.yaml
# inference settings
INFER:
  pretrained_model_path: null
  export_path: inference/fengwu_v2
  onnx_path: ${INFER.export_path}.onnx
  device: gpu
  engine: onnx
  precision: fp32
  ir_optim: false
  min_subgraph_size: 30
  gpu_mem: 100
  gpu_id: 0
  max_batch_size: 1
  num_cpu_threads: 10
  batch_size: 1
  mean_path: ./data_mean.npy
  std_path: ./data_std.npy
  input_file: './data/input1.npy'
  input_next_file: './data/input2.npy'

其中,input_fileinput_next_file 分别代表网络模型输入的开始时刻气象数据和6小时后的气象数据。

4. 结果可视化

模型推理结果包含 56 个 npy 文件,表示从预测时间点开始,未来 14 天内每隔6小时的气象数据。结果可视化需要先将数据从 npy 转换为 NetCDF 格式,然后采用 ncvue 进行查看。

  1. 安装相关依赖

    pip install cdsapi netCDF4 ncvue
    

  2. 使用脚本进行数据转换

    python convert_data.py
    

  3. 使用 ncvue 打开转换后的 NetCDF 文件, ncvue 具体说明见ncvue官方文档

5. 完整代码

examples/fengwu/predict.py
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from os import path as osp
from typing import List

import hydra
import numpy as np
import paddle
from omegaconf import DictConfig
from packaging import version

from deploy.python_infer import base
from ppsci.utils import logger


class FengWuPredictor(base.Predictor):
    """General predictor for FengWu model.

    Args:
        cfg (DictConfig): Running configuration.
    """

    # 14 day with time-interval of six hours
    PREDICT_TIMESTAMP = int(14 * 24 / 6)
    # Where 69 represents 69 atmospheric features, The first four variables are surface variables in the order of ['u10', 'v10', 't2m', 'msl'],
    # followed by non-surface variables in the order of ['z', 'q', 'u', 'v', 't']. Each data has 13 levels, which are ordered as
    # [50, 100, 150, 200, 250, 300, 400, 500, 600, 700, 850, 925, 1000].
    # Therefore, the order of the 69 variables is [u10, v10, t2m, msl, z50, z100, ..., z1000, q50, q100, ..., q1000, t50, t100, ..., t1000].
    NUM_ATMOSPHERIC_FEATURES = 69

    def __init__(
        self,
        cfg: DictConfig,
    ):
        assert cfg.INFER.engine == "onnx", "FengWu engine only supports 'onnx'."

        super().__init__(
            pdmodel_path=None,
            pdiparams_path=None,
            device=cfg.INFER.device,
            engine=cfg.INFER.engine,
            precision=cfg.INFER.precision,
            onnx_path=cfg.INFER.onnx_path,
            ir_optim=cfg.INFER.ir_optim,
            min_subgraph_size=cfg.INFER.min_subgraph_size,
            gpu_mem=cfg.INFER.gpu_mem,
            gpu_id=cfg.INFER.gpu_id,
            max_batch_size=cfg.INFER.max_batch_size,
            num_cpu_threads=cfg.INFER.num_cpu_threads,
        )
        self.log_freq = cfg.log_freq

        # get input names
        self.input_names = [
            input_node.name for input_node in self.predictor.get_inputs()
        ]

        # get output names
        self.output_names = [
            output_node.name for output_node in self.predictor.get_outputs()
        ]

        # load mean and std data
        self.data_mean = np.load(cfg.INFER.mean_path)[:, np.newaxis, np.newaxis]
        self.data_std = np.load(cfg.INFER.std_path)[:, np.newaxis, np.newaxis]

    def _preprocess_data(
        self, input_data_prev: np.ndarray, input_data_next: np.ndarray
    ) -> np.ndarray:
        input_data_prev_after_norm = (
            input_data_prev.astype("float32") - self.data_mean
        ) / self.data_std
        input_data_next_after_norm = (
            input_data_next.astype("float32") - self.data_mean
        ) / self.data_std
        input_data = np.concatenate(
            (input_data_prev_after_norm, input_data_next_after_norm), axis=0
        )[np.newaxis, :, :, :]
        input_data = input_data.astype(np.float32)

        return input_data

    def predict(
        self,
        input_data_prev: np.ndarray,
        input_data_next: np.ndarray,
        batch_size: int = 1,
    ) -> List[np.ndarray]:
        """Predicts the output of the yinglong model for the given input.

        Args:
            input_data_prev(np.ndarray): Atomospheric data at the first time moment.
            input_data_next(np.ndarray): Atmospheric data six later.
            batch_size (int, optional): Batch size, now only support 1. Defaults to 1.

        Returns:
            List[np.ndarray]: Prediction for next 56 hours.
        """
        if batch_size != 1:
            raise ValueError(
                f"FengWuPredictor only support batch_size=1, but got {batch_size}"
            )

        # process data
        input_data = self._preprocess_data(input_data_prev, input_data_next)

        output_data_list = []
        # prepare input dict
        for _ in range(self.PREDICT_TIMESTAMP):
            input_dict = {
                self.input_names[0]: input_data,
            }

            # run predictor
            output_data = self.predictor.run(None, input_dict)[0]
            input_data = np.concatenate(
                (
                    input_data[:, self.NUM_ATMOSPHERIC_FEATURES :],
                    output_data[:, : self.NUM_ATMOSPHERIC_FEATURES],
                ),
                axis=1,
            )
            output_data = (
                output_data[0, : self.NUM_ATMOSPHERIC_FEATURES] * self.data_std
            ) + self.data_mean

            output_data_list.append(output_data)

        return output_data_list


def inference(cfg: DictConfig):
    # log paddlepaddle's version
    if version.Version(paddle.__version__) != version.Version("0.0.0"):
        paddle_version = paddle.__version__
        if version.Version(paddle.__version__) < version.Version("2.6.0"):
            logger.warning(
                f"Detected paddlepaddle version is '{paddle_version}', "
                "currently it is recommended to use release 2.6 or develop version."
            )
    else:
        paddle_version = f"develop({paddle.version.commit[:7]})"

    logger.info(f"Using paddlepaddle {paddle_version}")

    # create predictor
    predictor = FengWuPredictor(cfg)

    # load data
    input_data_prev = np.load(cfg.INFER.input_file).astype(np.float32)
    input_data_next = np.load(cfg.INFER.input_next_file).astype(np.float32)

    # run predictor
    output_data_list = predictor.predict(input_data_prev, input_data_next)

    # save predict data
    for i in range(FengWuPredictor.PREDICT_TIMESTAMP):
        output_save_path = osp.join(cfg.output_dir, f"output_{i}.npy")
        np.save(output_save_path, output_data_list[i])
        logger.info(f"Save output with timestamp:{i} to {output_save_path}.")


@hydra.main(version_base=None, config_path="./conf", config_name="fengwu.yaml")
def main(cfg: DictConfig):
    if cfg.mode == "infer":
        inference(cfg)
    else:
        raise ValueError(f"cfg.mode should in ['infer'], but got '{cfg.mode}'")


if __name__ == "__main__":
    main()

6. 结果展示

下图展示了模型的未来6小时平均海平面气压预测结果,更多指标可以使用 ncvue 查看。

result

未来6小时平均海平面气压

7. 参考资料