Distributed Calibration Tutorial Using Julia Workers
This example will teach you how to use ClimaCalibrate to parallelize your calibration with workers. Workers are additional processes spun up to run code in a distributed fashion. In this tutorial, we will run ensemble members' forward models on different workers.
The example calibration uses CliMA's atmosphere model, ClimaAtmos.jl, in a column spatial configuration for 30 days to simulate outgoing radiative fluxes. Radiative fluxes are used in the observation map to calibrate the astronomical unit.
First, we load in some necessary packages.
using Distributed
import ClimaCalibrate as CAL
import ClimaAnalysis: SimDir, get, slice, average_xy
using ClimaUtilities.ClimaArtifacts
import EnsembleKalmanProcesses as EKP
import EnsembleKalmanProcesses: I, ParameterDistributions.constrained_gaussianNext, we add workers. These are primarily added by Distributed.addprocs or by starting Julia with multiple processes: julia -p <nprocs>.
addprocs itself initializes the workers and registers them with the main Julia process, but there are multiple ways to call it. The simplest is just addprocs(nprocs), which will create new local processes on your machine. The other is to use SlurmManager, which will acquire and start workers on Slurm resources. You can use keyword arguments to specify the Slurm resources:
addprocs(ClimaCalibrate.SlurmManager(nprocs), gpus_per_task = 1, time = "01:00:00")
For this example, we would add one worker if it was compatible with Documenter.jl:
addprocs(1)We can see the number of workers and their ID numbers:
nworkers()1workers()1-element Vector{Int64}:
1We can call functions on the worker using remotecall. We pass in the function name and the worker ID followed by the function arguments.
remotecall_fetch(*, 1, 4, 4)16ClimaCalibrate uses this functionality to run the forward model on workers.
Since the workers start in their own Julia sessions, we need to import packages and declare variables. Distributed.@everywhere executes code on all workers, allowing us to load the code that they need.
@everywhere begin
output_dir = joinpath("output", "climaatmos_calibration")
import ClimaCalibrate as CAL
import ClimaAtmos as CA
import ClimaComms
end
output_dir = joinpath("output", "climaatmos_calibration")
mkpath(output_dir)"output/climaatmos_calibration"First, we need to set up the forward model, which take in the sampled parameters, runs, and saves diagnostic output that can be processed and compared to observations. The forward model must override ClimaCalibrate.forward_model(iteration, member), since the workers will run this function in parallel.
Since forward_model(iteration, member) only takes in the iteration and member numbers, so we need to use these as hooks to set the model parameters and output directory. Two useful functions:
path_to_ensemble_member: Returns the ensemble member's output directoryparameter_path: Returns the ensemble member's parameter file as specified byEKP.TOMLInterface
The forward model below is running ClimaAtmos.jl in a minimal column spatial configuration.
@everywhere function CAL.forward_model(iteration, member)
config_dict = Dict(
"dt" => "2000secs",
"t_end" => "30days",
"config" => "column",
"h_elem" => 1,
"insolation" => "timevarying",
"output_dir" => output_dir,
"output_default_diagnostics" => false,
"dt_rad" => "6hours",
"rad" => "clearsky",
"co2_model" => "fixed",
"log_progress" => false,
"diagnostics" => [
Dict(
"reduction_time" => "average",
"short_name" => "rsut",
"period" => "30days",
"writer" => "nc",
),
],
)
#md # Set the output path for the current member
member_path = CAL.path_to_ensemble_member(output_dir, iteration, member)
config_dict["output_dir"] = member_path
#md # Set the parameters for the current member
parameter_path = CAL.parameter_path(output_dir, iteration, member)
if haskey(config_dict, "toml")
push!(config_dict["toml"], parameter_path)
else
config_dict["toml"] = [parameter_path]
end
#md # Turn off default diagnostics
config_dict["output_default_diagnostics"] = false
comms_ctx = ClimaComms.SingletonCommsContext()
atmos_config = CA.AtmosConfig(config_dict; comms_ctx)
simulation = CA.get_simulation(atmos_config)
CA.solve_atmos!(simulation)
return simulation
endNext, the observation map is required to process a full ensemble of model output for the ensemble update step. The observation map just takes in the iteration number, and always outputs an array. For observation map output G_ensemble, G_ensemble[:, m] must the output of ensemble member m. This is needed for compatibility with EnsembleKalmanProcesses.jl.
const days = 86_400
function CAL.observation_map(iteration)
single_member_dims = (1,)
G_ensemble = Array{Float64}(undef, single_member_dims..., ensemble_size)
for m in 1:ensemble_size
member_path = CAL.path_to_ensemble_member(output_dir, iteration, m)
simdir_path = joinpath(member_path, "output_active")
if isdir(simdir_path)
simdir = SimDir(simdir_path)
G_ensemble[:, m] .= process_member_data(simdir)
else
G_ensemble[:, m] .= NaN
end
end
return G_ensemble
endSeparating out the individual ensemble member output processing often results in more readable code.
function process_member_data(simdir::SimDir)
isempty(simdir.vars) && return NaN
rsut =
get(simdir; short_name = "rsut", reduction = "average", period = "30d")
return slice(average_xy(rsut); time = 30days).data
endprocess_member_data (generic function with 1 method)Now, we can set up the remaining experiment details:
- ensemble size, number of iterations
- the prior distribution
- the observational data
ensemble_size = 30
n_iterations = 7
noise = 0.1 * I
prior = constrained_gaussian("astronomical_unit", 6e10, 1e11, 2e5, Inf)ParameterDistribution with 1 entries:
'astronomical_unit' with EnsembleKalmanProcesses.ParameterDistributions.Constraint{EnsembleKalmanProcesses.ParameterDistributions.BoundedBelow}[Bounds: (200000.0, ∞)] over distribution EnsembleKalmanProcesses.ParameterDistributions.Parameterized(Distributions.Normal{Float64}(μ=24.153036641203013, σ=1.1528837102037748))
For a perfect model, we generate observations from the forward model itself. This is most easily done by creating an empty parameter file and running the 0th ensemble member:
@info "Generating observations"
parameter_file = CAL.parameter_path(output_dir, 0, 0)
mkpath(dirname(parameter_file))
touch(parameter_file)
simulation = CAL.forward_model(0, 0)Simulation
├── Running on: CPUSingleThreaded
├── Output folder: output/climaatmos_calibration/iteration_000/member_000/output_0000
├── Start date: 2010-01-01T00:00:00
├── Current time: 2.592e6 seconds
└── Stop time: 2.592e6 secondsLastly, we use the observation map itself to generate the observations.
observations = Vector{Float64}(undef, 1)
observations .= process_member_data(SimDir(simulation.output_dir))1-element Vector{Float64}:
126.61351013183594Now we are ready to run our calibration, putting it all together using the calibrate function. The WorkerBackend will automatically use all workers available to the main Julia process. Other backends are available for forward models that can't use workers or need to be parallelized internally. The simplest backend is the JuliaBackend, which runs all ensemble members sequentially and does not require Distributed.jl. For more information, see the Backends page.
user_initial_ensemble = EKP.construct_initial_ensemble(prior, ensemble_size)
ekp = EKP.EnsembleKalmanProcess(
user_initial_ensemble,
observations,
noise,
EKP.Inversion(),
EKP.default_options_dict(EKP.Inversion()),
)
eki = CAL.calibrate(CAL.WorkerBackend(), ekp, n_iterations, prior, output_dir)EnsembleKalmanProcesses.EnsembleKalmanProcess{Float64, Int64, EnsembleKalmanProcesses.Inversion{Float64, Nothing, Nothing}, EnsembleKalmanProcesses.DataMisfitController{Float64, String}, EnsembleKalmanProcesses.NesterovAccelerator{Float64}, Vector{EnsembleKalmanProcesses.UpdateGroup}, Nothing}(EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}[EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([23.811407382542278 22.661100094349493 … 23.366878046967877 23.912761972510133]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([24.251274939818213 23.109673532412668 … 23.81244388871784 24.350455623715753]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([24.83297131932289 23.719950072820854 … 24.412733662704994 24.925170589199713]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([25.49370805746493 24.490881482874112 … 25.143728519059778 25.560852751912183]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([25.899395849203163 25.31257755703062 … 25.792406058056596 25.896312890211934]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([25.802038159145955 25.86831030069679 … 25.90608812717111 25.787758988838597]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([25.785432735968403 25.997763824584563 … 25.88039839184724 25.771910046595877]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([25.764837706229212 25.950109054618864 … 25.829287045911833 25.755133242043005])], EnsembleKalmanProcesses.ObservationSeries{Vector{EnsembleKalmanProcesses.Observation{Vector{Vector{Float64}}, Vector{LinearAlgebra.Diagonal{Float64, Vector{Float64}}}, Vector{LinearAlgebra.Diagonal{Float64, Vector{Float64}}}, Vector{String}, Vector{UnitRange{Int64}}, Nothing}}, EnsembleKalmanProcesses.FixedMinibatcher{Vector{Vector{Int64}}, String, Random.TaskLocalRNG}, Vector{String}, Vector{Vector{Vector{Int64}}}, Nothing}(EnsembleKalmanProcesses.Observation{Vector{Vector{Float64}}, Vector{LinearAlgebra.Diagonal{Float64, Vector{Float64}}}, Vector{LinearAlgebra.Diagonal{Float64, Vector{Float64}}}, Vector{String}, Vector{UnitRange{Int64}}, Nothing}[EnsembleKalmanProcesses.Observation{Vector{Vector{Float64}}, Vector{LinearAlgebra.Diagonal{Float64, Vector{Float64}}}, Vector{LinearAlgebra.Diagonal{Float64, Vector{Float64}}}, Vector{String}, Vector{UnitRange{Int64}}, Nothing}([[126.61351013183594]], LinearAlgebra.Diagonal{Float64, Vector{Float64}}[[0.1;;]], LinearAlgebra.Diagonal{Float64, Vector{Float64}}[[10.0;;]], ["observation"], UnitRange{Int64}[1:1], nothing)], EnsembleKalmanProcesses.FixedMinibatcher{Vector{Vector{Int64}}, String, Random.TaskLocalRNG}([[1]], "order", Random.TaskLocalRNG()), ["series_1"], Dict("minibatch" => 1, "epoch" => 8), [[[1]], [[1]], [[1]], [[1]], [[1]], [[1]], [[1]], [[1]]], nothing), 30, EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}[EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([2.7250614166259766 0.2730550169944763 … 1.1201428174972534 3.337340831756592]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([6.567834854125977 0.6696982383728027 … 2.730684280395508 8.008601188659668]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([21.01921272277832 2.269540548324585 … 9.070941925048828 25.274215698242188]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([78.76170349121094 10.60521411895752 … 39.12564468383789 90.076416015625]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([177.17848205566406 54.83556365966797 … 143.08070373535156 176.07156372070312]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([145.86012268066406 166.50857543945312 … 179.55908203125 141.7570037841797]), EnsembleKalmanProcesses.DataContainers.DataContainer{Float64}([141.10064697265625 215.646728515625 … 170.5799102783203 137.3392791748047])], Dict("unweighted_loss" => [13828.046427588972, 11559.547740485285, 6658.35099412901, 1040.0024666985523, 49.691951538972596, 304.19541315570495, 452.4773891021817], "crps" => [111.47191317561055, 95.8629725718816, 60.36699724223959, 19.85958168064715, 9.692574520437072, 10.592635376831737, 12.701671664339411], "bayes_loss" => [138280.73870067133, 115596.86397812837, 66586.84684856542, 10404.820879309294, 502.0946149043195, 3047.2102618765457, 4529.945047240762], "unweighted_avg_rmse" => [117.5927141773204, 107.51533723374207, 81.59871931672096, 46.28974713484446, 29.26811367670695, 21.28052673339844, 21.27262166341146], "avg_rmse" => [371.8608130415058, 339.9933490597322, 258.03780719361674, 146.38103325936189, 92.55390203511901, 67.29493428564187, 67.26993625941996], "loss" => [138280.4642758897, 115595.47740485285, 66583.5099412901, 10400.024666985522, 496.919515389726, 3041.9541315570495, 4524.773891021817]), EnsembleKalmanProcesses.DataMisfitController{Float64, String}([7], 1.0, "stop"), EnsembleKalmanProcesses.NesterovAccelerator{Float64}([25.76831603018406 25.892570133046117 … 25.828451623899028 25.759237436746428], 0.20434762801820305), [6.013259786409275e-5, 3.628328372465082e-5, 2.550903024334289e-5, 2.846715317483624e-5, 6.474528694815796e-5, 0.00010321241067558519, 8.215201695685695e-5], EnsembleKalmanProcesses.UpdateGroup[EnsembleKalmanProcesses.UpdateGroup([1], [1], Dict("[1,...,1]" => "[1,...,1]"))], EnsembleKalmanProcesses.Inversion{Float64, Nothing, Nothing}(nothing, nothing, false, 0.0), Random._GLOBAL_RNG(), EnsembleKalmanProcesses.FailureHandler{EnsembleKalmanProcesses.Inversion, EnsembleKalmanProcesses.SampleSuccGauss}(EnsembleKalmanProcesses.var"#failsafe_update#174"()), EnsembleKalmanProcesses.Localizers.Localizer{EnsembleKalmanProcesses.Localizers.SECNice, Float64}(EnsembleKalmanProcesses.Localizers.var"#13#14"{EnsembleKalmanProcesses.Localizers.SECNice{Float64}}(EnsembleKalmanProcesses.Localizers.SECNice{Float64}(1000, 1.0, 1.0))), 0.1, nothing, false)This page was generated using Literate.jl.