API
Model Interface
ClimaCalibrate.forward_model
— Functionforward_model(iteration, member)
Execute the forward model simulation with the given configuration.
This function must be overridden by a component's model interface and should set things like the parameter path and other member-specific settings.
ClimaCalibrate.observation_map
— Functionobservation_map(iteration)
Runs the observation map for the specified iteration. This function must be implemented for each calibration experiment.
ClimaCalibrate.analyze_iteration
— Functionanalyze_iteration(ekp, g_ensemble, prior, output_dir, iteration)
After each evaluation of the observation map and before updating the ensemble, analyze_iteration
is evaluated.
This function is optional to implement.
For example, one may want to print information from the eki
object or plot g_ensemble
.
ClimaCalibrate.postprocess_g_ensemble
— Functionpostprocess_g_ensemble(ekp, g_ensemble, prior, output_dir, iteration)
Postprocess g_ensemble
after evaluating the observation map and before updating the ensemble.
Worker Interface
ClimaCalibrate.add_workers
— Functionadd_workers(
nworkers;
device = :gpu,
cluster = :auto,
time = DEFAULT_WALLTIME,
kwargs...
)
Add nworkers
worker processes to the current Julia session, automatically detecting and configuring for the available computing environment.
Arguments
nworkers::Int
: The number of worker processes to add.device::Symbol = :gpu
: The target compute device type, either:gpu
(1 GPU, 4 CPU cores) or:cpu
(1 CPU core).cluster::Symbol = :auto
: The cluster management system to use. Options::auto
: Auto-detect available cluster environment (SLURM, PBS, or local):slurm
: Force use of SLURM scheduler:pbs
: Force use of PBS scheduler:local
: Force use of local processing (standardaddprocs
)
time::Int = DEFAULT_WALLTIME
: Walltime in minutes, will be formatted appropriately for the cluster systemkwargs
: Other kwargs can be passed directly through toaddprocs
.
ClimaCalibrate.WorkerBackend
— TypeWorkerBackend
Used to run calibrations on Distributed.jl's workers. For use on a Slurm cluster, see SlurmManager
.
ClimaCalibrate.SlurmManager
— TypeSlurmManager(ntasks=get(ENV, "SLURM_NTASKS", 1))
The ClusterManager for Slurm clusters, taking in the number of tasks to request with srun
.
To execute the srun
command, run addprocs(SlurmManager(ntasks))
Keyword arguments can be passed to srun
: addprocs(SlurmManager(ntasks), gpus_per_task=1)
By default the workers will inherit the running Julia environment.
To run a calibration, call calibrate(WorkerBackend, ...)
To run functions on a worker, call remotecall(func, worker_id, args...)
ClimaCalibrate.PBSManager
— TypePBSManager(ntasks)
The ClusterManager for PBS/Torque clusters, taking in the number of tasks to request with qsub
.
To execute the qsub
command, run addprocs(PBSManager(ntasks))
. Unlike the SlurmManager
, this will not nest scheduled jobs, but will acquire new resources.
Keyword arguments can be passed to qsub
: addprocs(PBSManager(ntasks), nodes=2)
By default, the workers will inherit the running Julia environment.
To run a calibration, call calibrate(WorkerBackend, ...)
To run functions on a worker, call remotecall(func, worker_id, args...)
ClimaCalibrate.set_worker_loggers
— Functionset_worker_loggers(workers = workers())
Set the global logger to a simple file logger for the given workers.
ClimaCalibrate.map_remotecall_fetch
— Functionmap_remotecall_fetch(f::Function, args...; workers = workers())
Call function f
from each worker and wait for the results to return.
ClimaCalibrate.foreach_remotecall_wait
— Functionforeach_remotecall_wait(f::Function, args...; workers = workers())
Call function f
from each worker.
Backend Interface
ClimaCalibrate.calibrate
— Functioncalibrate(backend, ekp::EnsembleKalmanProcess, ensemble_size, n_iterations, prior, output_dir)
calibrate(backend, ensemble_size, n_iterations, observations, noise, prior, output_dir; ekp_kwargs...)
Run a full calibration on the given backend.
If the EKP struct is not given, it will be constructed upon initialization. While EKP keyword arguments are passed through to the EKP constructor, if using many keywords it is recommended to construct the EKP object and pass it into calibrate
.
Available Backends: WorkerBackend, CaltechHPCBackend, ClimaGPUBackend, DerechoBackend, JuliaBackend
Derecho, ClimaGPU, and CaltechHPC backends are designed to run on a specific high-performance computing cluster. WorkerBackend uses Distributed.jl to run the forward model on workers.
Keyword Arguments for HPC backends
- `model_interface: Path to the model interface file.
hpc_kwargs
: Dictionary of resource arguments for HPC clusters, passed to the job scheduler.verbose::Bool
: Enable verbose logging.- Any keyword arguments for the EnsembleKalmanProcess constructor, such as
scheduler
ClimaCalibrate.JuliaBackend
— TypeJuliaBackend
The simplest backend, used to run a calibration in Julia without any parallelization.
ClimaCalibrate.DerechoBackend
— TypeDerechoBackend
Used for NSF NCAR's Derecho supercomputing system.
ClimaCalibrate.CaltechHPCBackend
— TypeCaltechHPCBackend
Used for Caltech's high-performance computing cluster.
ClimaCalibrate.ClimaGPUBackend
— TypeClimaGPUBackend
Used for CliMA's private GPU server.
ClimaCalibrate.get_backend
— Functionget_backend()
Get ideal backend for deploying forward model runs. Each backend is found via gethostname()
. Defaults to JuliaBackend if none is found.
ClimaCalibrate.model_run
— Functionmodel_run(backend, iter, member, output_dir, experiment_dir; model_interface, verbose, hpc_kwargs)
Construct and execute a command to run a single forward model on a given job scheduler.
Uses the given backend
to run slurm_model_run
or pbs_model_run
.
Arguments:
- iter: Iteration number
- member: Member number
- output_dir: Calibration experiment output directory
- project_dir: Directory containing the experiment's Project.toml
- model_interface: Model interface file
- moduleloadstr: Commands which load the necessary modules
- hpc_kwargs: Dictionary containing the resources for the job. Easily generated using
kwargs
.
ClimaCalibrate.module_load_string
— Functionmodule_load_string(backend)
Return a string that loads the correct modules for a given backend when executed via bash.
Job Scheduler
ClimaCalibrate.wait_for_jobs
— Functionwait_for_jobs(jobids, output_dir, iter, experiment_dir, model_interface, module_load_str, model_run_func; verbose, hpc_kwargs, reruns=1)
Wait for a set of jobs to complete. If a job fails, it will be rerun up to reruns
times.
This function monitors the status of multiple jobs and handles failures by rerunning the failed jobs up to the specified number of reruns
. It logs errors and job completion status, ensuring all jobs are completed before proceeding.
Arguments:
jobids
: Vector of job IDs.output_dir
: Directory for output.iter
: Iteration number.experiment_dir
: Directory for the experiment.model_interface
: Interface to the model.module_load_str
: Commands to load necessary modules.model_run_func
: Function to run the model.verbose
: Print detailed logs if true.hpc_kwargs
: HPC job parameters.reruns
: Number of times to rerun failed jobs.
ClimaCalibrate.log_member_error
— Functionlog_member_error(output_dir, iteration, member, verbose=false)
Log a warning message when an error occurs. If verbose, includes the ensemble member's output.
ClimaCalibrate.kill_job
— Functionkill_job(jobid::SlurmJobID)
kill_job(jobid::PBSJobID)
End a running job, catching errors in case the job can not be ended.
ClimaCalibrate.job_status
— Functionjob_status(job_id)
Parse the slurm job_id's state and return one of three status symbols: :PENDING, :RUNNING, or :COMPLETED.
ClimaCalibrate.kwargs
— Functionkwargs(; kwargs...)
Create a dictionary from keyword arguments.
ClimaCalibrate.slurm_model_run
— Functionslurm_model_run(iter, member, output_dir, experiment_dir, model_interface, module_load_str; hpc_kwargs)
Construct and execute a command to run a single forward model on Slurm. Helper function for model_run
.
ClimaCalibrate.generate_sbatch_script
— Functiongenerate_sbatch_script(iter, member, output_dir, experiment_dir, model_interface; module_load_str, hpc_kwargs, exeflags="")
Generate a string containing an sbatch script to run the forward model. hpc_kwargs
is turned into a series of sbatch directives using generate_sbatch_directives
. module_load_str
is used to load the necessary modules and can be obtained via module_load_string
. exeflags
is a string of flags to pass to the Julia executable (defaults to empty string).
ClimaCalibrate.generate_sbatch_directives
— Functiongenerate_sbatch_directives(hpc_kwargs)
Generate Slurm sbatch directives from HPC kwargs.
ClimaCalibrate.submit_slurm_job
— Functionsubmit_slurm_job(sbatch_filepath; env=deepcopy(ENV))
Submit a job to the Slurm scheduler using sbatch, removing unwanted environment variables.
Unset variables: "SLURMMEMPERCPU", "SLURMMEMPERGPU", "SLURMMEMPER_NODE"
ClimaCalibrate.pbs_model_run
— Functionpbs_model_run(iter, member, output_dir, experiment_dir, model_interface, module_load_str; hpc_kwargs)
Construct and execute a command to run a single forward model on PBS Pro. Helper function for model_run
.
ClimaCalibrate.generate_pbs_script
— Functiongeneratepbsscript( iter, member, outputdir, experimentdir, modelinterface; moduleloadstr, hpckwargs, )
Generate a string containing a PBS script to run the forward model.
Returns:
qsub_contents::Function
: A function generating the content of the PBS script based on the provided arguments. This will run the contents of thejulia_script
, which have to be run from a file due to Derecho'sset_gpu_rank
.julia_script::String
: The Julia script string to be executed by the PBS job.
Helper function for pbs_model_run
.
ClimaCalibrate.submit_pbs_job
— Functionsubmit_pbs_job(sbatch_filepath; env=deepcopy(ENV))
Submit a job to the PBS Pro scheduler using qsub, removing unwanted environment variables.
Unset variables: "PBSMEMPERCPU", "PBSMEMPERGPU", "PBSMEMPER_NODE"
EnsembleKalmanProcesses Interface
ClimaCalibrate.initialize
— Functioninitialize(eki::EKP.EnsembleKalmanProcess, prior, output_dir)
initialize(ensemble_size, observations, noise, prior, output_dir)
Initialize a calibration, saving the initial parameter ensemble to a folder within output_dir
.
If no EKP struct is given, construct an EKP struct and return it.
ClimaCalibrate.save_G_ensemble
— Functionsave_G_ensemble(output_dir::AbstractString, iteration, G_ensemble)
Saves the ensemble's observation map output to the correct directory based on the provided configuration. Takes an output directory, iteration number, and the ensemble output to save.
ClimaCalibrate.update_ensemble
— Functionupdate_ensemble(output_dir::AbstractString, iteration, prior)
Updates the EnsembleKalmanProcess object and saves the parameters for the next iteration.
ClimaCalibrate.update_ensemble!
— Functionupdate_ensemble!(ekp, G_ens, output_dir, iteration, prior)
Updates an EKP object with data G_ens, saving the object and final parameters to disk.
ClimaCalibrate.observation_map_and_update!
— Functionobservation_map_and_update!(ekp, output_dir, iteration, prior)
Compute the observation map and update the given EKP object.
ClimaCalibrate.get_prior
— Functionget_prior(param_dict::AbstractDict; names = nothing)
get_prior(prior_path::AbstractString; names = nothing)
Constructs the combined prior distribution from a param_dict
or a TOML configuration file specified by prior_path
. If names
is provided, only those parameters are used.
ClimaCalibrate.get_param_dict
— Functionget_param_dict(distribution; names)
Generates a dictionary for parameters based on the specified distribution, assumed to be of floating-point type. If names
is not provided, the distribution's names will be used.
ClimaCalibrate.path_to_iteration
— Functionpath_to_iteration(output_dir, iteration)
Return the path to the directory for a given iteration within the specified output directory.
ClimaCalibrate.path_to_ensemble_member
— Functionpath_to_ensemble_member(output_dir, iteration, member)
Return the path to an ensemble member's directory for a given iteration and member number.
ClimaCalibrate.path_to_model_log
— Functionpath_to_model_log(output_dir, iteration, member)
Return the path to an ensemble member's forward model log for a given iteration and member number.
ClimaCalibrate.parameter_path
— Functionparameter_path(output_dir, iteration, member)
Return the path to an ensemble member's parameter file.
ClimaCalibrate.minibatcher_over_samples
— Functionminibatcher_over_samples(n_samples, batch_size)
Create a FixedMinibatcher
that divides n_samples
into batches of size batch_size
.
If n_samples
is not divisible by batch_size
, the remaining samples will be dropped.
minibatcher_over_samples(samples, batch_size)
Create a FixedMinibatcher
that divides a vector of samples into batches of size batch_size
.
If the number of samples is not divisible by batch_size
, the remaining samples will be dropped.
ClimaCalibrate.observation_series_from_samples
— Functionobservation_series_from_samples(samples, batch_size, names = nothing)
Create an EKP.ObservationSeries
from a vector of EKP.Observation
samples.
If the number of samples is not divisible by batch_size
, the remaining samples will be dropped.
ClimaCalibrate.load_latest_ekp
— Functionload_latest_ekp(output_dir)
Return the most recent EnsembleKalmanProcess struct from the given output directory.
Returns nothing if no EKP structs are found.
Observation Recipe Interface
ClimaCalibrate.ObservationRecipe.AbstractCovarianceEstimator
— Typeabstract type AbstractCovarianceEstimator end
An object that estimates the noise covariance matrix from observational data that is appropriate for a sample between start_date
and end_date
.
AbstractCovarianceEstimator
have to provide one function, ObservationRecipe.covariance
.
The function has to have the signature
ObservationRecipe.covariance(
covar_estimator::AbstractCovarianceEstimator,
vars,
start_date,
end_date,
)
and return a noise covariance matrix.
ClimaCalibrate.ObservationRecipe.SeasonalDiagonalCovariance
— TypeSeasonalDiagonalCovariance <: AbstractCovarianceEstimator
Contain the necessary information to construct a diagonal covariance matrix whose entries represents seasonal covariances from ClimaAnalysis.OutputVar
s.
ClimaCalibrate.ObservationRecipe.SeasonalDiagonalCovariance
— MethodSeasonalDiagonalCovariance(model_error_scale = 0.0,
regularization = 0.0,
ignore_nan = true)
Create a SeasonalDiagonalCovariance
which specifies how the covariance matrix should be formed. When used with ObservationRecipe.observation
or ObservationRecipe.covariance
, return a Diagonal
matrix.
Keyword arguments
model_error_scale
: Noise from the model error added to the covariance matrix. This is(model_error_scale * seasonal_mean).^2
, whereseasonal_mean
is the seasonal mean for each of the quantity for each of the season (DJF, MAM, JJA, SON).regularization
: A diagonal matrix of the formregularization * I
is added to the covariance matrix.ignore_nan
: Iftrue
, thenNaN
s are ignored when computing the covariance matrix. Otherwise,NaN
are included in the intermediate calculation of the covariance matrix. Note that allNaN
s are removed in the last step of forming the covariance matrix even ifignore_nan
isfalse
.
ClimaCalibrate.ObservationRecipe.SVDplusDCovariance
— TypeSVDplusDCovariance <: AbstractCovarianceEstimator
Contain the necessary information to construct a EKP.SVDplusD
covariance matrix from ClimaAnalysis.OutputVar
s.
ClimaCalibrate.ObservationRecipe.SVDplusDCovariance
— MethodSVDplusDCovariance(sample_date_ranges;
model_error_scale = 0.0,
regularization = 0.0,
Create a SVDplusDCovariance
which specifies how the covariance matrix should be formed. When used with ObservationRecipe.observation
or ObservationRecipe.covariance
, return a EKP.SVDplusD
covariance matrix.
For sample_date_ranges
, it is recommended that each sample contains data from a single year. For example, if the samples are created from time series data of seasonal averages, then each sample should contain all four seasons. Otherwise, the covariance matrix may not make sense. For example, if each sample contains two years of seasonally averaged data, then the sample mean is the seasonal mean of every other season across the years stacked vertically. For a concrete example, if the sample contain DJF for both 2010 and 2011. Then, the sample mean will be of mean of DJF 2010, 2012, and so on, and the mean of DJF 2011, 2013, and so on. As a result, if one were to use this covariance matrix with model_error_scale
, the covariance matrix will not make sense.
Positional arguments
sample_date_ranges
: The start and end dates of each samples. This is used to determine the sample from the time series data of theOutputVar
s. These dates must be present in all theOutputVar
s.
Keyword arguments
model_error_scale
: Noise from the model error added to the covariance matrix. This is(model_error_scale * mean(samples, dims = 2)).^2
, wheremean(samples, dims = 2)
is the mean of the samples.regularization
: A diagonal matrix of the formregularization * I
is added to the covariance matrix.
ClimaCalibrate.ObservationRecipe.covariance
— Functioncovariance(covar_estimator::SeasonalDiagonalCovariance,
vars::Union{OutputVar, Iterable{OutputVar}},
start_date,
end_date)
Compute the noise covariance matrix of seasonal quantities from var
that is appropriate for a sample of seasonal quantities across time for seasons between start_date
and end_date
.
The diagonal is computed from the variances of the seasonal quantities.
covariance(covar_estimator::SVDplusDCovariance,
vars::Union{OutputVar, Iterable{OutputVar}},
start_date,
end_date)
Compute the EKP.SVDplusD
covariance matrix appropriate for a sample with times between start_date
and end_date
.
ClimaCalibrate.ObservationRecipe.observation
— Functionobservation(covar_estimator::AbstractCovarianceEstimator,
vars,
start_date,
end_date;
name = nothing)
Return an EKP.Observation
with a sample between the dates start_date
and end_date
, a covariance matrix defined by covar_estimator
, name
determined from the short names of vars
, and metadata.
ClimaCalibrate.ObservationRecipe.seasonally_aligned_yearly_sample_date_ranges
— Functionseasonally_aligned_yearly_sample_date_ranges(var::OutputVar)
Generate sample dates that conform to a seasonally aligned year from dates(var)
.
A seasonally aligned year is defined to be from December to November of the following year.
This function is useful for finding the sample dates of samples consisting of all four seasons in a single year. For example, one can use this function to find the sample_date_ranges
when constructing SVDplusDCovariance
.
ClimaCalibrate.ObservationRecipe.change_data_type
— FunctionObservationRecipe.change_data_type(var::OutputVar, data_type)
Return a OutputVar
with data
of type data_type
.
This is useful if you want to make covariance matrix whose element type is data_type
.