Parallelism and High Performance Computing (HPC)
One benefit of ensemble methods is their ability to be parallelized. The parallellism occurs outside of the EKP update, and so is not present in the source code. On this page we provide suggestions for parallelization for two types of problems:
- Run a parallel loop or map within your current Julia session.
- Run a parallel loop through a read/write file interface and workload manager. We provide some utilities to read/write files in a TOML format.
Case 1: Parallel code within Julia
Let's look at the simple example for the Lorenz 96 dynamical system. In particular we'll focus on the evaluations of the Lorenz 96 solver explicitly written in GModel.jl
that contains the following loop over the N_ens
-sized ensemble:
function run_ensembles(settings, lorenz_params, nd, N_ens)
g_ens = zeros(nd, N_ens)
for i in 1:N_ens
# run the model with the current parameters, i.e., map θ to G(θ)
g_ens[:, i] = lorenz_forward(settings, lorenz_params[i])
end
return g_ens
end
Each ensemble member i
runs the Lorenz 96 model with settings
configuration, and model parameters lorenz_params[i]
. The runs do not interact with each other, and the user has several options to parallelize.
Running examples:
All of the following example cases are covered in distributed_Lorenz_example.jl
. At the top of file uncomment one of the following options
case = multithread
case = pmap
case = distfor
Multithreading, @threads
To parallelize with multithreading, julia must call the file with a prespecified number of threads. For example, for 4 threads,
$ julia --project -t 4 distributed_Lorenz_example.jl
We exploit the multithreading over N_ens
ensemble members in this example with the following loop in GModel_multithread.jl
:
function run_ensembles(settings, lorenz_params, nd, N_ens)
g_ens = zeros(nd, N_ens)
Threads.@threads for i in 1:N_ens
# run the model with the current parameters, i.e., map θ to G(θ)
g_ens[:, i] = lorenz_forward(settings, lorenz_params[i])
end
return g_ens
end
You can read more about multi-threading here.
Parallel map, pmap
When using multiple processes, the Julia environment must first be loaded on each worker processor. We include these lines in the main file
using Distributed
addprocs(4; exeflags = "--project")
And we would call the file is called by
$ julia --project distributed_Lorenz_example
This ensures that we obtain 4
worker processes that are loaded with julia's current environment specified by --project
(unlike when calling julia --project -p 4
). We use pmap
to apply a function to each element of the list (i.e the ensemble member configurations). For example, see the following code from GModel_pmap.jl
,
using Distributed
function run_ensembles(settings, lorenz_params, nd, N_ens)
g_ens = zeros(nd, N_ens)
g_ens[:, :] = vcat(pmap(x -> lorenz_forward(settings, x), lorenz_params)...)
return g_ens
end
If pmap
is called within a module, that module will also need to be loaded on all workers. For this we use the macro @everywhere module XYZ
.
You can read more about pmap
here.
Distributed loop, @distributed for
When using multiple processes, the Julia environment must also be loaded on each worker processor. We include these lines in the main file
using Distributed
addprocs(4; exeflags = "--project")
And we would call the file is called by
$ julia --project distributed_Lorenz_example
When using distributed loops, it is necessary to be able to write to shared memory. To do this we use the SharedArrays package. For example, see the following distributed loop in GModel_distfor
using Distributed
using SharedArrays
function run_ensembles(settings, lorenz_params, nd, N_ens)
g_ens = SharedArray{Float64}(nd, N_ens)
@sync @distributed for i in 1:N_ens
# run the model with the current parameters, i.e., map θ to G(θ)
g_ens[:, i] = lorenz_forward(settings, lorenz_params[i])
end
return g_ens
end
@sync
forces the code to wait until all processes in the @distributed for
loop are complete before continuing.
If @distributed for
is used within another module, that module will also need to be loaded on each worker processor. For this we use the macro @everywhere module XYZ
.
@distributed for
is most performant when there is a large ensemble, N_ens
, and the forward map is computationally cheap. Otherwise, pmap
is usually the preferred choice.
You can read more about @distributed
here
Case 2: HPC interface
Some applications involve interfacing with non-Julia code or using HPC workload managers. In these cases we suggest using an alternative workflow where one interleaves scripts that launch EKP updates and scripts that runs the model. One possible implementation is the following loop
Step
0
(a). Write an ensemble of parameter filesparameters_0_i
fori = 1:N_ens
, with each parameter file containing a sample from the prior distribution.Step
0
(b). Construct EKP EKP object and save injld2
format e.g.ekpobject.jld2
.for
n = 1,..., N_it
, do:- Step
n
(a). RunN_ens
forward models, with forward modeli
running with the corresponding parameter fileparameters_{n-1}_i
. Write processed output data to filedata_{n-1}_i
. - Step
n
(b). Run the ensemble update, by loading bothekpobject.jld2
and reading in the parameter filesdata_{n-1}_i
fori = 1:N_ens
. Perform the EKP update step. Write new parameter filesparameters_n_i
fori = 1:N_ens
. Save the ensemble object inekpobject.jld2
- iterate
n -> n+1
.
- Step
In HPC interfacing example: ClimateMachine we implement a similar loop to interface with a SLURM workload manager for HPC. Here, sbatch
scripts are used to run each component of the calibration procedure. The outer loop over the EKP iterations lives in the overarching sbatch
script, and for each iteration, the inner loop are realised as "arrays" of slurm jobs (1, ..., N_ens
), launched for each ensemble member. The code excerpt below, taken from ekp_calibration.sbatch
for details), illustrates this procedure:
# First call to calibrate.jl will create the ensemble files from the priors
id_init_ens=$(sbatch --parsable ekp_init_calibration.sbatch)
for it in $(seq 1 1 $n_it)
do
# Parallel runs of forward model
if [ "$it" = "1" ]; then
id_ens_array=$(sbatch --parsable --kill-on-invalid-dep=yes --dependency=afterok:$id_init_ens --array=1-$n ekp_single_cm_run.sbatch $it)
else
id_ens_array=$(sbatch --parsable --kill-on-invalid-dep=yes --dependency=afterok:$id_ek_upd --array=1-$n ekp_single_cm_run.sbatch $it)
fi
id_ek_upd=$(sbatch --parsable --kill-on-invalid-dep=yes --dependency=afterok:$id_ens_array --export=n=$n ekp_cont_calibration.sbatch $it)
done
Here a dependency tree is set up in SLURM, which iterates calls to the scripts ekp_single_cm_run.sbatch
(which runs the forward model on HPC) and ekp_cont_calibration.sbatch
(which performs an EKI update). We find this a smooth workflow that uses HPC resources well, and can likely be set up on other workload managers.
For more details see the code and docs for the HPC interfacing example: ClimateMachine.