Getting Started

Preliminaries

You may find it helpful to read the documentation of EnsembleKalmanProcesses.jl before reading this section.

Every calibration requires

  • observational data, which can be a Vector or an EnsembleKalmanProcess.Observation
  • a prior parameter distribution. The easiest way to construct a distribution is with the EnsembleKalmanProcess.constrained_gaussian function,
  • a forward model, which uses input parameters to return diagnostic output
  • an observation map, which maps the forward model's diagnostic output to a vector comparable to the observations

Implementing your experiment

All calibrate functions require a backend, an EnsembleKalmanProcesses.EnsembleKalmanProcess object, and a model interface. This tutorial will not go into details on how to construct the EnsembleKalmanProcess object. Please refer to the docs instead.

Backend system

Backends

For more information about the backend system, refer to the documentation here.

There are three different kind of backends which are JuliaBackend, WorkerBackend, and the HPC cluster backends.

The JuliaBackend is the simplest backend. The work done by each ensemble member is done sequentially.

import ClimaCalibrate

backend = ClimaCalibrate.JuliaBackend()

Next, the WorkerBackend is a backend compatible with Distributed.jl. The work done by each ensemble member is done in parallel on different processes. This backend is compatible with the Slurm and PBS job schedulers. It requires starting a job with the resources necessary to start the worker processes. In the example below, worker processes are being launched by addprocs on a HPC cluster that supports Slurm. You would pass backend to the calibrate function.

import ClimaCalibrate
import Distributed

Distributed.addprocs(ClimaCalibrate.SlurmManager())
backend = ClimaCalibrate.WorkerBackend()

Finally, the HPCBackend is a backend specfic to each HPC cluster. The work done by each ensemble member is done in parallel on different jobs. In the example, each job would start with the directives, modules, and env_vars listed. The job would last for 720 minutes with single task of 12 CPUs and 1 GPU with regular job priority. The climacommon module will be loaded when the job starts and the environment variables for ClimaComms will be set.

import ClimaCalibrate

backend = ClimaCalibrate.DerechoBackend(;
    directives = [
        :job_priority => "regular",
        :time => 720,
        :ntasks => 1,
        :cpus_per_task => 12,
        :gpus_per_task => 1,
    ],
    modules = ["climacommon"],
    env_vars = ["CLIMACOMMS_CONTEXT" => "SINGLETON", "CLIMACOMMS_DEVICE" => "CUDA"],
)

Model interface

ClimaCalibrate provides the abstract type AbstractModelInterface. For calibration, you will create a struct that will subtype this type and implements the required interface for this function to work.

The necessary functions are

  • forward_model(interface, iteration, member) which runs the forward model for a single ensemble member.
  • observation_map(interface, iteration) which processes model output and returns a matrix of outputs where each column is the forward model output. This matrix is called the G_ensemble matrix.

If you want to calibrate using one of the HPCBackends, you also need to implement

  • model_interface_filepath(interface) which returns the path to the file that defines the model interface.

Forward Model

Your forward model must implement the forward_model(interface, iteration, member) function stub.

Since this function only takes in the iteration and member numbers, there are some hooks to obtain parameters and the output directory:

which can be used to set the forward model's output directory.

member's parameter file, which can be loaded in via TOML or passed to ClimaParams.

Observation map

Observational data

Observational data generally consists of a vector of observations with length d and the covariance matrix of the observational noise with size d × d.

If you need to stack or sample from observations, EnsembleKalmanProcesses.jl's Observation or ObservationSeries are fully-featured.

For preprocessing observational data, you want to preprocess for NaNs and regrid and convert units to match the simulation data and vice versa.

If you are using ClimaAnalysis to preprocess the observational data, then you may want to use ObservationRecipe to create observations from OutputVars.

An observation map to process model output and return the full ensemble's observations is also required.

This is provided by implementing the function stub observation_map(interface, iteration). This function needs to return an Matrix where the ith column is the ith ensemble member's observational output. This matrix is called the G ensemble matrix.

Here is a simple readable template for the observation_map

function ClimaCalibrate.observation_map(interface, iteration)
    # This assumes the output_dir is a field of interface
    (; output_dir) = interface
    ekp = JLD2.load_object(ClimaCalibrate.ekp_path(output_dir, iteration))
    ensemble_size = EKP.get_N_ens(ekp)
    G_ensemble = ClimaCalibrate.g_ens_matrix(ekp)
    for member in 1:ensemble_size
        G_ensemble[:, member] = process_member_data(iteration, member)
    end
    return G_ensemble
end

Note that each column of the G ensemble matrix should match with the observations. A common source of error is that the ordering of the variables in the observations is not the same as the ordering of the variables for the columns of the G ensemble matrix.

GEnsembleBuilder

If you are using ObservationRecipe to construct your observations and are using ClimaAnalysis to postprocess your simulation output, then you might want to use GEnsembleBuilder which simplifies the construction of the G ensemble matrix.

Optional postprocessing

It may be the case that observation_map is insufficient as you need more information, such as information from the ekp object to compute G_ensemble. Further postprocessing of the G_ensemble object can be done by implementing the postprocess_g_ensemble as shown below.

function ClimaCalibrate.postprocess_g_ensemble(
    interface,
    ekp,
    g_ensemble,
    prior,
    output_dir,
    iteration
)
    return g_ensemble
end

After each evaluation of the observation map and before updating the ensemble, it may be helpful to print the errors from the ekp object or plot G_ensemble. This can be done by implementing the analyze_iteration as shown below.

function ClimaCalibrate.analyze_iteration(
    interface,
    ekp,
    g_ensemble,
    prior,
    output_dir,
    iteration,
)
    @info "Analyzing iteration"
    @info "Iteration $iteration"
    @info "Current mean parameter: $(EnsembleKalmanProcesses.get_ϕ_mean_final(prior, ekp))"
    @info "g_ensemble: $g_ensemble"
    @info "output_dir: $output_dir"
    return nothing
end

Parameters

Every parameter that is being calibrated requires a prior distribution to sample from.

EnsembleKalmanProcesses.jl's constrained_gaussian provides a user-friendly way to constructor Gaussian distributions.

Multiple distributions can be combined using combine_distributions(vec_of_distributions).

For more information, see the EKP documentation for prior distributions.

Experiment Configuration

A calibration consisting of m ensemble members that will run for n iterations. The recommended ensemble size is a function of the chosen method and the number of parameters being calibrated. See the EnsembleKalmanProcesses.jl documentation for more information for choosing the appropriate ensemble size.

Calibrate

Now all of the pieces should be in place:

  • forward map
  • observation map
  • observations
  • covariance matrix of the observations (noise)
  • prior distribution
  • ensemble size
  • number of iterations

Lastly, you need to set the output directory and the number of iterations to run for.

n_iterations = 7
output_dir = "output/my_experiment"

Once all of this has been set up, you can call put it all together using the calibrate function:

# Construct the EnsembleKalmanProcess object as ekp
ClimaCalibrate.calibrate(
    backend,
    ekp,
    interface,
    n_iterations,
    prior,
    output_dir,
)

For more information on parallelizing your calibration, see the Backends page.

File structure

For a calibration that ran for a single iteration, the calibration output directory might look like this.

.
├── iteration_001
│   ├── eki_file.jld2
│   ├── G_ensemble.jld2
│   ├── member_001
│   │   ├── checkpoint.txt
│   │   └── parameters.toml
│   ├── member_002
│   │   ├── checkpoint.txt
│   │   └── parameters.toml
│   ├── member_003
│   │   ├── checkpoint.txt
│   │   └── parameters.toml
│   └── prior.jld2
└── iteration_002
    ├── eki_file.jld2
    ├── member_001
    │   └── parameters.toml
    ├── member_002
    │   └── parameters.toml
    └── member_003
        └── parameters.toml

Each file in the output directory serves a specific purpose:

  • eki_file.jld2: The serialized EnsembleKalmanProcess state saved before the iteration runs. For example, iteration_001/eki_file.jld2 holds the state used to generate the parameters for iteration 1.
  • parameters.toml: Each member's sampled parameter values, written before the forward model runs. Load this via TOML or pass it to ClimaParams in your forward_model.
  • G_ensemble.jld2: The G ensemble matrix produced by the observation map after all forward models in the iteration complete.
  • checkpoint.txt: A flag file written when a member's forward model completes successfully, used to skip completed members on restart.
  • prior.jld2: The prior distribution, saved once in iteration_001.

The JLD2 files can be loaded using JLD2.

To access these paths programmatically:

Checkpointing

ClimaCalibrate checkpoints each forward model and iteration so that an interrupted calibration can seamlessly pick up where it left off without wasting resources.

If a calibration (run via calibrate) exits after completing an iteration, when it is restarted it will automatically run the next iteration. This is done by checking if the ensemble forward map results file (G_ensemble.jld2) and the EKI file (eki_file.jld2) have been saved.

If a calibration is interrupted during forward model execution, causing a partial iteration, incomplete forward models will be rerun when the calibration is restarted. Completed forward models will not be rerun. This is done by checking each model's checkpoint file and the flag it contains.

Forward model restarts

Although the model is checkpointed, this does not mean the forward model will automatically restarts. This functionality is delegated to forward model.

Example Calibrations

The example tutorial provides a clear calibration example that can be run locally using the WorkerBackend.

Another example experiment can be found in the package repo under experiments/surface_fluxes_perfect_model. This experiment uses the SurfaceFluxes.jl package to generate a physical model that calculates the Monin Obukhov turbulent surface fluxes based on idealized atmospheric and surface conditions. Since this is a "perfect model" example, the same model is used to generate synthetic observations using its default parameters and a small amount of noise. These synthetic observations are considered to be the ground truth, which is used to assess the model ensembles' performance when parameters are drawn from the prior parameter distributions.

It is a perfect-model calibration, using its own output as observational data. By default, it runs 20 ensemble members for 8 iterations. This example can be run on the most common backend, the JuliaBackend, with the following script:

import ClimaCalibrate
import EnsembleKalmanProcesses as EKP

include(joinpath(pkgdir(ClimaCalibrate), "experiments", "surface_fluxes_perfect_model", "utils.jl"))
@show ensemble_size n_iterations observation variance prior

# Construct the initial ensemble and EKP object
initial_ensemble = EKP.construct_initial_ensemble(prior, ensemble_size)
ekp = EKP.EnsembleKalmanProcess(
    initial_ensemble,
    observation,
    variance,
    EKP.Inversion(),
    EKP.default_options_dict(EKP.Inversion()),
)

output_dir = "my_experiment"
mkpath(output_dir)
eki = ClimaCalibrate.calibrate(
    JuliaBackend(),
    ekp,
    SurfaceFluxModelInterface(),
    n_iterations,
    prior,
    output_dir,
)

theta_star_vec =
    (; coefficient_a_m_businger = 4.7, coefficient_a_h_businger = 4.7)

convergence_plot(
    eki,
    prior,
    theta_star_vec,
    ["coefficient_a_m_businger", "coefficient_a_h_businger"],
    output_dir,
)

g_vs_iter_plot(eki, output_dir)